You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:41 UTC
[43/47] incubator-carbondata git commit: Merge remote-tracking branch
'carbon_master/master' into apache/master
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 74239a3,5c25406..22b6021
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -31,7 -32,8 +32,8 @@@ import org.apache.spark.sql.catalyst.{S
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+ import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.execution.command.{DimensionRelation, _}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveQlWrapper
@@@ -332,109 -340,122 +340,123 @@@ class CarbonSqlParser(
// if create table taken is found then only we will handle.
case Token("TOK_CREATETABLE", children) =>
- var fields: Seq[Field] = Seq[Field]()
- var tableComment: String = ""
- var tableProperties = Map[String, String]()
- var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
- var likeTableName: String = ""
- var storedBy: String = ""
- var ifNotExistPresent: Boolean = false
- var dbName: Option[String] = None
- var tableName: String = ""
- children.collect {
- // collecting all the field list
- case list@Token("TOK_TABCOLLIST", _) =>
- val cols = BaseSemanticAnalyzer.getColumns(list, true)
- if (cols != null) {
- val dupColsGrp = cols.asScala
- .groupBy(x => x.getName) filter { case (_, colList) => colList
- .size > 1
- }
- if (dupColsGrp.size > 0) {
- var columnName: String = ""
- dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
- columnName = columnName.substring(0, columnName.lastIndexOf(", "))
- val errorMessage = "Duplicate column name: " + columnName + " found in table " +
- ".Please check create table statement."
- throw new MalformedCarbonCommandException(errorMessage)
- }
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val name = Option(col.getName())
- // This is to parse complex data types
- val x = col.getName + ' ' + col.getType
- val f: Field = anyFieldDef(new lexical.Scanner(x))
- match {
- case Success(field, _) => field
- case failureOrError => new Field(columnName, dataType, name, None, null,
- Some("columnar"))
+ var fields: Seq[Field] = Seq[Field]()
+ var tableComment: String = ""
+ var tableProperties = Map[String, String]()
+ var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+ var likeTableName: String = ""
+ var storedBy: String = ""
+ var ifNotExistPresent: Boolean = false
+ var dbName: Option[String] = None
+ var tableName: String = ""
+
+ try {
+
+ children.collect {
+ // collecting all the field list
+ case list@Token("TOK_TABCOLLIST", _) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list, true)
+ if (cols != null) {
+ val dupColsGrp = cols.asScala.groupBy(x => x.getName) filter {
+ case (_, colList) => colList.size > 1
+ }
+ if (dupColsGrp.size > 0) {
+ var columnName: String = ""
+ dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
+ columnName = columnName.substring(0, columnName.lastIndexOf(", "))
+ val errorMessage = "Duplicate column name: " + columnName + " found in table " +
+ ".Please check create table statement."
+ throw new MalformedCarbonCommandException(errorMessage)
}
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = getScaleAndPrecision(col.getType)
- f.precision = precision
- f.scale = scale
- f.dataType = Some("decimal")
+ cols.asScala.map { col =>
+ val columnName = col.getName()
+ val dataType = Option(col.getType)
+ val name = Option(col.getName())
+ // This is to parse complex data types
+ val x = col.getName + ' ' + col.getType
+ val f: Field = anyFieldDef(new lexical.Scanner(x))
+ match {
+ case Success(field, _) => field
+ case failureOrError => new Field(columnName, dataType, name, None, null,
+ Some("columnar"))
+ }
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = getScaleAndPrecision(col.getType)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ fields ++= Seq(f)
}
- fields ++= Seq(f)
}
- }
- case Token("TOK_IFNOTEXISTS", _) =>
- ifNotExistPresent = true
-
- case t@Token("TOK_TABNAME", _) =>
- val (db, tblName) = extractDbNameTableName(t)
- dbName = db
- tableName = tblName.toLowerCase()
-
- case Token("TOK_TABLECOMMENT", child :: Nil) =>
- tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
- case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
- val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
- if (cols != null) {
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val comment = col.getComment
- val partitionCol = new PartitionerField(columnName, dataType, comment)
- partitionCols ++= Seq(partitionCol)
+ case Token("TOK_IFNOTEXISTS", _) =>
+ ifNotExistPresent = true
+
+ case t@Token("TOK_TABNAME", _) =>
+ val (db, tblName) = extractDbNameTableName(t)
+ dbName = db
+ tableName = tblName.toLowerCase()
+
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+
+ case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+ if (cols != null) {
+ cols.asScala.map { col =>
+ val columnName = col.getName()
+ val dataType = Option(col.getType)
+ val comment = col.getComment
+ val partitionCol = new PartitionerField(columnName, dataType, comment)
+ partitionCols ++= Seq(partitionCol)
+ }
}
- }
- case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
- tableProperties ++= getProperties(list)
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+ tableProperties ++= getProperties(list)
- case Token("TOK_LIKETABLE", child :: Nil) =>
- likeTableName = child.getChild(0).getText()
+ case Token("TOK_LIKETABLE", child :: Nil) =>
+ likeTableName = child.getChild(0).getText()
- case Token("TOK_STORAGEHANDLER", child :: Nil) =>
- storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ case Token("TOK_STORAGEHANDLER", child :: Nil) =>
+ storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
- case _ => // Unsupport features
- }
+ case _ => // Unsupport features
+ }
- if (!storedBy.equals(CarbonContext.datasourceName)) {
- // TODO: should execute by Hive instead of error
- sys.error("Not a carbon format request")
- }
+ if (!(storedBy.equals(CarbonContext.datasourceName) ||
+ storedBy.equals(CarbonContext.datasourceShortName))) {
+ // TODO: should execute by Hive instead of error
+ sys.error("Not a carbon format request")
+ }
- // validate tblProperties
- if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
- throw new MalformedCarbonCommandException("Invalid table properties")
- }
- // prepare table model of the collected tokens
- val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
- partitionCols,
- tableProperties)
-
- // get logical plan.
- CreateTable(tableModel)
+ // validate tblProperties
+ if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
+ throw new MalformedCarbonCommandException("Invalid table properties")
+ }
+ // prepare table model of the collected tokens
+ val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+ dbName,
+ tableName,
+ fields,
+ partitionCols,
+ tableProperties)
+
+ // get logical plan.
+ CreateTable(tableModel)
+ }
+ catch {
+ case ce: MalformedCarbonCommandException =>
+ val message = if (tableName.isEmpty) "Create table command failed. "
+ else if (!dbName.isDefined) s"Create table command failed for $tableName. "
+ else s"Create table command failed for ${dbName.get}.$tableName. "
+ LOGGER.audit(message + ce.getMessage)
+ throw ce
+ }
}
}
@@@ -1295,7 -1343,16 +1324,16 @@@
protected lazy val cleanFiles: Parser[LogicalPlan] =
CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
- case schemaName ~ cubeName => CleanFiles(schemaName, cubeName.toLowerCase())
+ case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
}
+ protected lazy val explainPlan: Parser[LogicalPlan] =
+ (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+ case isExtended ~ logicalPlan =>
+ logicalPlan match {
+ case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+ case _ => ExplainCommand(OneRowRelation)
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index e8f6e11,1162db4..9a16f77
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@@ -24,15 -25,17 +25,17 @@@ import org.apache.spark.sql.catalyst.In
import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
import org.carbondata.core.carbon.metadata.encoder.Encoding
-import org.carbondata.query.carbonfilterinterface.{ExpressionType, RowIntf}
-import org.carbondata.query.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
-import org.carbondata.query.expression.conditional.ConditionalExpression
-import org.carbondata.query.expression.exception.FilterUnsupportedException
+import org.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
+import org.carbondata.scan.expression.conditional.ConditionalExpression
+import org.carbondata.scan.expression.exception.FilterUnsupportedException
+import org.carbondata.scan.filter.intf.{ExpressionType, RowIntf}
import org.carbondata.spark.util.CarbonScalaUtil
- class SparkUnknownExpression(sparkExp: SparkExpression)
+ class SparkUnknownExpression(var sparkExp: SparkExpression)
extends UnknownExpression with ConditionalExpression {
+ private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+ private var isExecutor: Boolean = false
children.addAll(getColumnList())
override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d6cbaf4,1dd066f..01bb218
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -156,8 -157,13 +156,13 @@@ case class AlterTableModel(dbName: Opti
case class CompactionModel(compactionSize: Long,
compactionType: CompactionType,
carbonTable: CarbonTable,
- cubeCreationTime: Long)
+ tableCreationTime: Long)
+ case class CompactionCallableModel(hdfsStoreLocation: String, carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
+ cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
+ compactionType: CompactionType)
+
object TableNewProcessor {
def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
new TableNewProcessor(cm, sqlContext).process
@@@ -767,12 -1201,12 +772,12 @@@ private[sql] case class AlterTableCompa
def run(sqlContext: SQLContext): Seq[Row] = {
// TODO : Implement it.
- var tableName = alterTableModel.tableName
+ val tableName = alterTableModel.tableName
- val schemaName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
+ val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
if (null == org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .getCarbonTable(schemaName + "_" + tableName)) {
- logError("alter table failed. table not found: " + schemaName + "." + tableName)
- sys.error("alter table failed. table not found: " + schemaName + "." + tableName)
+ .getCarbonTable(databaseName + "_" + tableName)) {
+ logError("alter table failed. table not found: " + databaseName + "." + tableName)
+ sys.error("alter table failed. table not found: " + databaseName + "." + tableName)
}
val relation =
@@@ -975,20 -1417,23 +980,20 @@@ private[sql] case class DeleteLoadsByLo
throw new MalformedCarbonCommandException(errorMessage)
}
- var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+ val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(schemaName + '_' + tableName)
+ .getCarbonTable(dbName + '_' + tableName)
- var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
if (null == carbonTable) {
- var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
- Option(schemaName),
- tableName,
- None
- )(sqlContext).asInstanceOf[CarbonRelation]
+ var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
}
- var path = carbonTable.getMetaDataFilepath()
+ val path = carbonTable.getMetaDataFilepath()
- var invalidLoadTimestamps = segmentStatusManager
+ val invalidLoadTimestamps = segmentStatusManager
.updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
if(invalidLoadTimestamps.isEmpty) {
- LOGGER.audit(s"Delete load by load date is successfull for $schemaName.$tableName.")
+ LOGGER.audit(s"Delete load by load date is successfull for $dbName.$tableName.")
}
else {
sys.error("Delete load by load date is failed. No matching load found.")
@@@ -1491,17 -2116,16 +1503,17 @@@ private[sql] case class DeleteLoadByDat
def run(sqlContext: SQLContext): Seq[Row] = {
- val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- LOGGER.audit(s"The delete load by date request has been received for $schemaName.$cubeName")
+ val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
+ LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
+ val identifier = TableIdentifier(tableName, Option(dbName))
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).asInstanceOf[CarbonRelation]
+ .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
var level: String = ""
- var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
+ val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
- .getInstance().getCarbonTable(schemaName + '_' + cubeName)
+ .getInstance().getCarbonTable(dbName + '_' + tableName)
if (relation == null) {
- LOGGER.audit(s"The delete load by date is failed. Table $schemaName.$cubeName does not exist")
- sys.error(s"Table $schemaName.$cubeName does not exist")
+ LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
}
val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 07b505c,e18e1d3..0775fea
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@@ -155,11 -207,19 +155,19 @@@ class CarbonMetastoreCatalog(hiveContex
// creating zookeeper instance once.
// if zookeeper is configured as carbon lock type.
- if (CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT)
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
- val zookeeperUrl = hiveContext.getConf("spark.deploy.zookeeper.url", "127.0.0.1:2181")
- val zookeeperUrl: String = hive.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
++ val zookeeperUrl: String = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
+ if (zookeeperUrl != null) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
ZookeeperInit.getInstance(zookeeperUrl)
+ LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
+ var configuredLockType = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE)
+ if (null == configuredLockType) {
+ configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ configuredLockType)
+ }
}
if (metadataPath == null) {
@@@ -429,25 -489,16 +437,16 @@@
}
}
- metadata.cubesMeta -= metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))(0)
+ metadata.tablesMeta -= metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .removeTable(schemaName + "_" + cubeName)
+ .removeTable(dbName + "_" + tableName)
- try {
- sqlContext.sql(s"DROP TABLE $dbName.$tableName").collect()
- } catch {
- case e: Exception =>
- LOGGER.audit(
- s"Error While deleting the table $dbName.$tableName during drop Table" + e.getMessage)
- }
- logInfo(s"Table $tableName of $dbName Database dropped syccessfully.")
- LOGGER.info("Table " + tableName + " of " + dbName + " Database dropped syccessfully.")
-
- sqlContext.asInstanceOf[HiveContext].runSqlHive(s"DROP TABLE IF EXISTS $schemaName.$cubeName")
++ sqlContext.asInstanceOf[HiveContext].runSqlHive(s"DROP TABLE IF EXISTS $dbName.$tableName")
}
- private def getTimestampFileAndType(schemaName: String, cubeName: String) = {
+ private def getTimestampFileAndType(databaseName: String, tableName: String) = {
val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 85c6a1d,49e6702..748a408
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@@ -405,44 -464,48 +410,48 @@@ object CarbonDataRDDFactory extends Log
*
* @param futureList
*/
- def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
- breakable {
- while (true) {
-
- val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- hdfsStoreLocation,
- carbonLoadModel,
- partitioner.partitionCount,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- if (loadsToMerge.size() > 1) {
- loadsToMerge.asScala.foreach(seg => {
- logger.info("load identified for merge is " + seg.getLoadName)
- }
- )
+ def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util
+ .List[LoadMetadataDetails]): Unit = {
- val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
- carbonLoadModel,
- partitioner,
- storeLocation,
- compactionModel.carbonTable,
- kettleHomePath,
- compactionModel.tableCreationTime,
- loadsToMerge,
- sqlContext
- )
- )
- futureList.add(future)
- segList = CarbonDataMergerUtil
- .filterOutAlreadyMergedSegments(segList, loadsToMerge)
- }
- else {
- break
- }
- }
+ loadsToMerge.asScala.foreach(seg => {
+ logger.info("loads identified for merge is " + seg.getLoadName)
}
+ )
+
+ val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
+ carbonLoadModel,
+ partitioner,
+ storeLocation,
+ compactionModel.carbonTable,
+ kettleHomePath,
- compactionModel.cubeCreationTime,
++ compactionModel.tableCreationTime,
+ loadsToMerge,
+ sqlContext,
+ compactionModel.compactionType
+ )
+
+ val future: Future[Void] = executor
+ .submit(new CompactionCallable(compactionCallableModel
+ )
+ )
+ futureList.add(future)
+ }
+ }
+
+ def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
+ // Deleting the any partially loaded data if present.
+ // in some case the segment folder which is present in store will not have entry in
+ // status.
+ // so deleting those folders.
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+ }
+ catch {
+ case e: Exception =>
+ logger
+ .error("Exception in compaction thread while clean up of stale segments " + e
+ .getMessage
+ )
}
}
@@@ -713,50 -776,59 +722,59 @@@
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
partitioner.partitionCount, currentLoadCount.toString)
- val status = new
- CarbonDataLoadRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storeLocation,
- hdfsStoreLocation,
- kettleHomePath,
- partitioner,
- columinar,
- currentRestructNumber,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- blocksGroupBy,
- isTableSplitPartition
- ).collect()
- val newStatusMap = scala.collection.mutable.Map.empty[String, String]
- status.foreach { eachLoadStatus =>
- val state = newStatusMap.get(eachLoadStatus._1)
- state match {
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- if eachLoadStatus._2.getLoadStatus == CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case _ =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- }
- }
-
var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- newStatusMap.foreach {
- case (key, value) =>
- if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ var status: Array[(String, LoadMetadataDetails)] = null
+ try {
+ status = new
- CarbonDataLoadRDD(sc.sparkContext,
++ CarbonDataLoadRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storeLocation,
+ hdfsStoreLocation,
+ kettleHomePath,
+ partitioner,
+ columinar,
+ currentRestructNumber,
+ currentLoadCount,
- cubeCreationTime,
++ tableCreationTime,
+ schemaLastUpdatedTime,
+ blocksGroupBy,
+ isTableSplitPartition
+ ).collect()
+ val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+ status.foreach { eachLoadStatus =>
+ val state = newStatusMap.get(eachLoadStatus._1)
+ state match {
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ if eachLoadStatus._2.getLoadStatus ==
+ CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case _ =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
}
- }
+ }
- if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
- partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
- loadStatus = partitionStatus
+ newStatusMap.foreach {
+ case (key, value) =>
+ if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+ !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ }
+ }
+
+ if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
+ partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
+ loadStatus = partitionStatus
+ }
+ } catch {
+ case ex: Throwable =>
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ logInfo("DataLoad failure")
+ logger.error(ex)
}
if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
@@@ -779,10 -851,12 +797,10 @@@
aggTables.asScala.foreach { aggTableName =>
CarbonLoaderUtil
.deleteSlice(partitioner.partitionCount, carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, aggTableName, hdfsStoreLocation,
- currentRestructNumber, newSlice
- )
+ carbonLoadModel.getTableName, hdfsStoreLocation, currentRestructNumber, newSlice)
}
}
- message = "Dataload failure"
+ message = "DataLoad failure"
}
logInfo("********clean up done**********")
logger.audit(s"Data load is failed for " +
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 78538f1,70d0cb1..e79937f
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@@ -35,8 -36,10 +36,11 @@@ import org.carbondata.core.carbon.{Carb
import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.carbondata.core.constants.CarbonCommonConstants
import org.carbondata.core.datastorage.store.impl.FileFactory
+ import org.carbondata.core.locks.CarbonLockFactory
+ import org.carbondata.core.locks.LockUsage
+ import org.carbondata.core.util.CarbonProperties
import org.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
import org.carbondata.spark.tasks.DictionaryWriterTask
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index e22869c,7b15cbf..f687006
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@@ -67,83 -70,119 +69,119 @@@ class CarbonMergerRDD[K, V]
override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
- var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
- val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
- .getTableName + carbonLoadModel.getTaskNo
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ val tempLocationKey: String = CarbonCommonConstants
+ .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+ .getDatabaseName + '_' + carbonLoadModel
+ .getTableName + '_' + carbonLoadModel.getTaskNo
+
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.length > 0) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ LOGGER.info("Temp storeLocation taken is " + storeLocation)
+ var mergeStatus = false
+ var mergeNumber = ""
+ try {
+ var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
- // sorting the table block info List.
- var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+ // sorting the table block info List.
+ var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
- Collections.sort(tableBlockInfoList)
+ Collections.sort(tableBlockInfoList)
- val segmentMapping: java.util.Map[String, TaskBlockInfo] =
- CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+ val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+ CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
- val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
- CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+ val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
+ CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
- carbonLoadModel.setStorePath(hdfsStoreLocation)
+ carbonLoadModel.setStorePath(hdfsStoreLocation)
- // taking the last table block info for getting the segment properties.
- val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
- (tableBlockInfoList.size()-1).getSegmentId())
+ // taking the last table block info for getting the segment properties.
+ val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
+ (tableBlockInfoList.size() - 1).getSegmentId()
+ )
- val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
- .getColumnCardinality
+ val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
+ .getColumnCardinality
- val segmentProperties = new SegmentProperties(
- listMetadata.get(listMetadata.size() - 1).getColumnInTable,
- colCardinality
- )
+ val segmentProperties = new SegmentProperties(
+ listMetadata.get(listMetadata.size() - 1).getColumnInTable,
+ colCardinality
+ )
- val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
- factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
- dataFileMetadataSegMapping
- )
- val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, schemaName,
++ val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+ factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ dataFileMetadataSegMapping
+ )
- // fire a query and get the results.
- var result2: util.List[RawResultIterator] = null
- try {
- result2 = exec.processTableBlocks()
- } catch {
- case e: Throwable =>
- exec.clearDictionaryFromQueryModel
- LOGGER.error(e)
- if (null != e.getMessage) {
- sys.error("Exception occurred in query execution :: " + e.getMessage)
- } else {
- sys.error("Exception occurred in query execution.Please check logs.")
- }
- }
+ // fire a query and get the results.
+ var result2: util.List[RawResultIterator] = null
+ try {
+ result2 = exec.processTableBlocks()
+ } catch {
+ case e: Throwable =>
+ exec.clearDictionaryFromQueryModel
+ LOGGER.error(e)
+ if (null != e.getMessage) {
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ } else {
+ sys.error("Exception occurred in query execution.Please check logs.")
+ }
+ }
- val mergeNumber = mergedLoadName
- .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
- CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+ mergeNumber = mergedLoadName
+ .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+ CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+ )
+
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(schemaName,
++ val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+ factTableName,
+ carbonLoadModel.getTaskNo,
+ "0",
+ mergeNumber,
+ true
)
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
- factTableName,
- carbonLoadModel.getTaskNo,
- "0",
- mergeNumber
- )
+ carbonLoadModel.setSegmentId(mergeNumber)
+ carbonLoadModel.setPartitionId("0")
+ val merger =
+ new RowResultMerger(result2,
- schemaName,
++ databaseName,
+ factTableName,
+ segmentProperties,
+ tempStoreLoc,
+ carbonLoadModel,
+ colCardinality
+ )
+ mergeStatus = merger.mergerSlice()
- carbonLoadModel.setSegmentId(mergeNumber)
- carbonLoadModel.setPartitionId("0")
- val merger =
- new RowResultMerger(result2,
- databaseName,
- factTableName,
- segmentProperties,
- tempStoreLoc,
- carbonLoadModel,
- colCardinality
- )
- val mergeStatus = merger.mergerSlice()
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ throw e
+ }
+ finally {
+ // delete temp location data
+ val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+ try {
+ val isCompactionFlow = true
+ CarbonLoaderUtil
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ }
+ }
var finished = false
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 00bf5b2,0000000..5a1dedc
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,253 -1,0 +1,290 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.spark.rdd
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.carbondata.common.CarbonIterator
+import org.carbondata.common.logging.LogServiceFactory
+import org.carbondata.core.cache.dictionary.Dictionary
+import org.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
+import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.carbondata.scan.executor.QueryExecutorFactory
+import org.carbondata.scan.expression.Expression
+import org.carbondata.scan.model.QueryModel
+import org.carbondata.scan.result.BatchResult
+import org.carbondata.scan.result.iterator.ChunkRowIterator
+import org.carbondata.spark.RawValue
+import org.carbondata.spark.load.CarbonLoaderUtil
+import org.carbondata.spark.util.QueryPlanUtil
+
+class CarbonSparkPartition(rddId: Int, val idx: Int,
+ val locations: Array[String],
+ val tableBlockInfos: util.List[TableBlockInfo])
+ extends Partition {
+
+ override val index: Int = idx
+
+ // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
+ override def hashCode(): Int = {
+ 41 * (41 + rddId) + idx
+ }
+}
+
+ /**
+ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
+ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
+ * level filtering in driver side.
+ */
+class CarbonScanRDD[V: ClassTag](
+ sc: SparkContext,
+ queryModel: QueryModel,
+ filterExpression: Expression,
+ keyClass: RawValue[V],
+ @transient conf: Configuration,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ baseStoreLocation: String)
+ extends RDD[V](sc, Nil) with Logging {
+
+ val defaultParallelism = sc.defaultParallelism
+
+ override def getPartitions: Array[Partition] = {
+ val statisticRecorder = new QueryStatisticsRecorder(queryModel.getQueryId)
+ val startTime = System.currentTimeMillis()
+ val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
+ QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+
+ val result = new util.ArrayList[Partition](defaultParallelism)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ // set filter resolver tree
+ try {
+ // before applying filter check whether segments are available in the table.
+ val splits = carbonInputFormat.getSplits(job)
+ if (!splits.isEmpty) {
- var filterResolver = carbonInputFormat
++ val filterResolver = carbonInputFormat
+ .getResolvedFilter(job.getConfiguration, filterExpression)
+ CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
+ queryModel.setFilterExpressionResolverTree(filterResolver)
+ }
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ }
+ // get splits
+ val splits = carbonInputFormat.getSplits(job)
+ if (!splits.isEmpty) {
+ val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+ val blockList = carbonInputSplits.map(inputSplit =>
+ new TableBlockInfo(inputSplit.getPath.toString,
+ inputSplit.getStart, inputSplit.getSegmentId,
+ inputSplit.getLocations, inputSplit.getLength
+ ).asInstanceOf[Distributable]
+ )
+ if (blockList.nonEmpty) {
+ // group blocks to nodes, tasks
+ val startTime = System.currentTimeMillis
+ var statistic = new QueryStatistic
+ val activeNodes = DistributionUtil
+ .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
+ val nodeBlockMapping =
+ CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
+ activeNodes.toList.asJava
+ )
+ val timeElapsed: Long = System.currentTimeMillis - startTime
+ statistic.addStatistics("Total Time taken in block(s) allocation", System.currentTimeMillis)
+ statisticRecorder.recordStatistics(statistic);
+ statistic = new QueryStatistic
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { entry =>
+ entry._2.asScala.foreach { blocksPerTask => {
+ val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
+ if (blocksPerTask.size() != 0) {
+ result
+ .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
+ i += 1
+ }
+ }
+ }
+ }
+ val noOfBlocks = blockList.size
+ val noOfNodes = nodeBlockMapping.size
+ val noOfTasks = result.size()
+ logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
+ + s"parallelism: $defaultParallelism , " +
+ s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
+ )
+ statistic.addStatistics("Time taken to identify Block(s) to scan", System.currentTimeMillis)
+ statisticRecorder.recordStatistics(statistic);
+ statisticRecorder.logStatistics
+ result.asScala.foreach { r =>
+ val cp = r.asInstanceOf[CarbonSparkPartition]
+ logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
+ + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+ )
+ }
+ } else {
+ logInfo("No blocks identified to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ }
+ else {
+ logInfo("No valid segments found to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ result.toArray(new Array[Partition](result.size()))
+ }
+
+ override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[V] {
+ var rowIterator: CarbonIterator[Array[Any]] = _
+ var queryStartTime: Long = 0
+ try {
+ val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
+ if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
+ queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
+ // fill table block info
+ queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
+ queryStartTime = System.currentTimeMillis
+
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ logInfo("*************************" + carbonPropertiesFilePath)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
+ }
+ // execute query
+ rowIterator = new ChunkRowIterator(
+ QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
+ asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+
+ }
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ if (null != e.getMessage) {
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ } else {
+ sys.error("Exception occurred in query execution.Please check logs.")
+ }
+ }
+
+ var havePair = false
+ var finished = false
+ var recordCount = 0
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = (null == rowIterator) || (!rowIterator.hasNext)
+ havePair = !finished
+ }
+ if (finished) {
+ clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- if(null!=queryModel.getStatisticsRecorder) {
++ if (null != queryModel.getStatisticsRecorder) {
++ val queryStatistic = new QueryStatistic
++ queryStatistic
++ .addStatistics("Total Time taken to execute the query in executor Side",
++ System.currentTimeMillis - queryStartTime
++ )
++ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
+ queryModel.getStatisticsRecorder.logStatistics();
+ }
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ recordCount += 1
+ if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
+ clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- if(null!=queryModel.getStatisticsRecorder) {
++ if (null != queryModel.getStatisticsRecorder) {
++ val queryStatistic = new QueryStatistic
++ queryStatistic
++ .addStatistics("Total Time taken to execute the query in executor Side",
++ System.currentTimeMillis - queryStartTime
++ )
++ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic);
+ queryModel.getStatisticsRecorder.logStatistics();
+ }
+ }
+ keyClass.getValue(rowIterator.next())
+ }
+ def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
+ if (null != columnToDictionaryMap) {
+ org.carbondata.spark.util.CarbonQueryUtil
+ .clearColumnDictionaryCache(columnToDictionaryMap)
+ }
+ }
+ }
+ iter
+ }
+
+ /**
+ * Get the preferred locations where to launch this task.
+ */
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- val theSplit = partition.asInstanceOf[CarbonSparkPartition]
- theSplit.locations.filter(_ != "localhost")
- }
++ override def getPreferredLocations(partition: Partition): Seq[String] = {
++ val theSplit = partition.asInstanceOf[CarbonSparkPartition]
++ val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
++ val tableBlocks = theSplit.tableBlockInfos
++ // node name and count mapping
++ val blockMap = new util.LinkedHashMap[String, Integer]()
++
++ tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
++ location => {
++ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
++ val currentCount = blockMap.get(location)
++ if (currentCount == null) {
++ blockMap.put(location, 1)
++ } else {
++ blockMap.put(location, currentCount + 1)
++ }
++ }
++ }
++ )
++ )
++
++ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
++ nodeCount1.getValue > nodeCount2.getValue
++ }
++ )
++
++ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
++ firstOptionLocation ++ sortedNodesList
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 1693008,b3effd3..5406e77
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@@ -79,22 -93,40 +79,35 @@@ object CarbonScalaUtil
}
}
- def convertCarbonToSparkDataType(dataType: DataType): types.DataType = {
- dataType match {
- case DataType.STRING => StringType
- case DataType.INT => IntegerType
- case DataType.LONG => LongType
- case DataType.DOUBLE => DoubleType
- case DataType.BOOLEAN => BooleanType
- case DataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
- case DataType.TIMESTAMP => TimestampType
+ def getKettleHomePath(sqlContext: SQLContext): String = {
+ val carbonHome = System.getenv("CARBON_HOME")
+ var kettleHomePath: String = null
+ if (carbonHome != null) {
+ kettleHomePath = System.getenv("CARBON_HOME") + "/processing/carbonplugins"
+ }
+ if (kettleHomePath == null) {
+ kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
}
+ if (null == kettleHomePath) {
+ kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
+ }
+ kettleHomePath
}
+ def updateDataType(
+ currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
+ currentDataType match {
+ case decimal: DecimalType =>
+ val scale = currentDataType.asInstanceOf[DecimalType].scale
+ DecimalType(DecimalType.MAX_PRECISION, scale)
+ case _ =>
+ currentDataType
+ }
+ }
+
+ case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+
object CarbonSparkUtil {
- def createBaseRDD(carbonContext: CarbonContext, carbonTable: CarbonTable): TransformHolder = {
- val relation = CarbonEnv.getInstance(carbonContext).carbonCatalog
- .lookupRelation1(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName, None)(carbonContext).asInstanceOf[CarbonRelation]
- val rdd = new SchemaRDD(carbonContext, relation)
- rdd.registerTempTable(carbonTable.getFactTableName)
- TransformHolder(rdd, createSparkMeta(carbonTable))
- }
def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index b381079,0e333dd..780c022
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@@ -80,11 -92,8 +92,9 @@@ class TestLoadDataWithDictionaryExclude
}
test("test load data with dictionary exclude & include and with empty dimension") {
+ sql("select ID from t3").show()
checkAnswer(
- sql("select ID from t3"), Seq(Row(1), Row(2), Row(3), Row(4), Row(5), Row(6), Row(7),
- Row(8), Row(9), Row(10), Row(11), Row(12), Row(13), Row(14), Row(15), Row(16), Row
- (17), Row(18), Row(19), Row(20))
+ sql("select ID from exclude_include_t3"), sql("select ID from exclude_include_hive_t3")
)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
index 483a766,db67c86..a29cefb
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
@@@ -60,45 -61,82 +60,82 @@@ class TimestampDataTypeDirectDictionary
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
.getCanonicalPath
- var csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
+ val csvFilePath = currentDirectory + "/src/test/resources/datasample.csv"
- sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryCube OPTIONS" +
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE directDictionaryTable OPTIONS" +
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')");
-
} catch {
case x: Throwable => CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
}
- test("select doj from directDictionaryTable") {
+ test("test direct dictionary for not null condition") {
checkAnswer(
- sql("select doj from directDictionaryTable"),
- sql("select doj from directDictionaryCube where doj is not null"),
++ sql("select doj from directDictionaryTable where doj is not null"),
Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0")),
Row(Timestamp.valueOf("2016-04-14 15:00:09.0"))
)
)
}
+ test("test direct dictionary for getting all the values") {
+ checkAnswer(
+ sql("select doj from directDictionaryCube"),
+ Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0")),
+ Row(Timestamp.valueOf("2016-04-14 15:00:09.0")),
+ Row(null)
+ )
+ )
+ }
+
+ test("test direct dictionary for not equals condition") {
+ checkAnswer(
+ sql("select doj from directDictionaryCube where doj != '2016-04-14 15:00:09.0'"),
+ Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09.0"))
+ )
+ )
+ }
+
+ test("test direct dictionary for null condition") {
+ checkAnswer(
+ sql("select doj from directDictionaryCube where doj is null"),
+ Seq(Row(null)
+ )
+ )
+ }
- test("select doj from directDictionaryCube with equals filter") {
+ test("select doj from directDictionaryTable with equals filter") {
checkAnswer(
- sql("select doj from directDictionaryCube where doj='2016-03-14 15:00:09'"),
+ sql("select doj from directDictionaryTable where doj='2016-03-14 15:00:09'"),
Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
)
}
- test("select doj from directDictionaryTable with greater than filter") {
- test("select doj from directDictionaryCube with regexp_replace equals filter") {
++ test("select doj from directDictionaryTable with regexp_replace equals filter") {
+ checkAnswer(
- sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
++ sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') = '2016/03/14 15:00:09'"),
+ Seq(Row(Timestamp.valueOf("2016-03-14 15:00:09")))
+ )
+ }
-
- test("select doj from directDictionaryCube with regexp_replace NOT IN filter") {
++
++ test("select doj from directDictionaryTable with regexp_replace NOT IN filter") {
+ checkAnswer(
- sql("select doj from directDictionaryCube where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
++ sql("select doj from directDictionaryTable where regexp_replace(doj, '-', '/') NOT IN ('2016/03/14 15:00:09')"),
+ Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")), Row(null))
+ )
+ }
-
- test("select doj from directDictionaryCube with greater than filter") {
++
++ test("select doj from directDictionaryTable with greater than filter") {
checkAnswer(
- sql("select doj from directDictionaryCube where doj>'2016-03-14 15:00:09'"),
+ sql("select doj from directDictionaryTable where doj>'2016-03-14 15:00:09'"),
Seq(Row(Timestamp.valueOf("2016-04-14 15:00:09")))
)
-
}
- test("select count(doj) from directDictionaryCube") {
+ test("select count(doj) from directDictionaryTable") {
checkAnswer(
- sql("select count(doj) from directDictionaryCube"),
+ sql("select count(doj) from directDictionaryTable"),
Seq(Row(2))
)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryWithNoDictTestCase.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeNullDataTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 161a19e,7cb6dfd..4075e60
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@@ -41,14 -41,26 +41,26 @@@ class AllDataTypesTestCaseFilter extend
}
- test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") {
+ test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") {
checkAnswer(
- sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
- sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
+ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"),
+ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization"))
}
-
+
+ test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')") {
+ checkAnswer(
+ sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"),
+ sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') NOT IN ('development')"))
+ }
+
+ test("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'") {
+ checkAnswer(
+ sql("select empno,empname from alldatatypescubeFilter where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"),
+ sql("select empno,empname from alldatatypescubeFilter_hive where regexp_replace(workgroupcategoryname, 'er', 'ment') != 'development'"))
+ }
+
override def afterAll {
- sql("drop table alldatatypescubeFilter")
- sql("drop table alldatatypescubeFilter_hive")
+ sql("drop table alldatatypestableFilter")
+ sql("drop table alldatatypestableFilter_hive")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
index 10aa1fd,0000000..0fae687
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java
@@@ -1,73 -1,0 +1,94 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonProperties;
+
+/**
+ * This class is a Lock factory class which is used to provide lock objects.
+ * Using this lock object client can request the lock and unlock.
+ */
+public class CarbonLockFactory {
+
+ /**
+ * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon.
+ */
+ private static String lockTypeConfigured;
+
+ static {
- CarbonLockFactory.updateZooKeeperLockingStatus();
++ CarbonLockFactory.getLockTypeConfigured();
+ }
+
+ /**
+ * This method will determine the lock type.
+ *
+ * @param tableIdentifier
+ * @param lockFile
+ * @return
+ */
+ public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier,
+ String lockFile) {
+ switch (lockTypeConfigured.toUpperCase()) {
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+ return new LocalFileLock(tableIdentifier, lockFile);
+
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+ return new ZooKeeperLocking(tableIdentifier, lockFile);
+
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+ return new HdfsFileLock(tableIdentifier, lockFile);
+
+ default:
+ throw new UnsupportedOperationException("Not supported the lock type");
+ }
+ }
+
+ /**
++ *
++ * @param locFileLocation
++ * @param lockFile
++ * @return carbon lock
++ */
++ public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
++ switch (lockTypeConfigured.toUpperCase()) {
++ case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
++ return new LocalFileLock(locFileLocation, lockFile);
++
++ case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
++ return new ZooKeeperLocking(locFileLocation, lockFile);
++
++ case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
++ return new HdfsFileLock(locFileLocation, lockFile);
++
++ default:
++ throw new UnsupportedOperationException("Not supported the lock type");
++ }
++ }
++
++ /**
+ * This method will set the zookeeper status whether zookeeper to be used for locking or not.
+ */
- private static void updateZooKeeperLockingStatus() {
++ private static void getLockTypeConfigured() {
+ lockTypeConfigured = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT);
-
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
index b45ae96,0000000..3305477
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java
@@@ -1,91 -1,0 +1,106 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
++import org.carbondata.common.logging.LogService;
++import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
++import org.carbondata.core.util.CarbonProperties;
+
+/**
+ * This class is used to handle the HDFS File locking.
+ * This is acheived using the concept of acquiring the data out stream using Append option.
+ */
+public class HdfsFileLock extends AbstractCarbonLock {
+
++ private static final LogService LOGGER =
++ LogServiceFactory.getLogService(HdfsFileLock.class.getName());
+ /**
+ * location hdfs file location
+ */
+ private String location;
+
+ private DataOutputStream dataOutputStream;
+
+ public static String tmpPath;
+
+ static {
- tmpPath = System.getProperty("hadoop.tmp.dir");
++ tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
++ System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
++ }
++
++ /**
++ * @param lockFileLocation
++ * @param lockFile
++ */
++ public HdfsFileLock(String lockFileLocation, String lockFile) {
++ this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
++ + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
++ LOGGER.info("HDFS lock path:"+this.location);
++ initRetry();
+ }
+
+ /**
+ * @param tableIdentifier
+ * @param lockFile
+ */
+ public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
- this.location =
- tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
- + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName()
- + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
++ this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
++ .getTableName(), lockFile);
+ initRetry();
+ }
+
+ /* (non-Javadoc)
+ * @see org.carbondata.core.locks.ICarbonLock#lock()
+ */
+ @Override public boolean lock() {
+ try {
+ if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
+ FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+ }
+ dataOutputStream =
+ FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+
+ return true;
+
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.carbondata.core.locks.ICarbonLock#unlock()
+ */
+ @Override public boolean unlock() {
+ if (null != dataOutputStream) {
+ try {
+ dataOutputStream.close();
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
index 88f9a23,0000000..15374a3
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
@@@ -1,151 -1,0 +1,159 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+
+/**
+ * This class handles the file locking in the local file system.
+ * This will be handled using the file channel lock API.
+ */
+public class LocalFileLock extends AbstractCarbonLock {
+ /**
+ * location is the location of the lock file.
+ */
+ private String location;
+
+ /**
+ * fileOutputStream of the local lock file
+ */
+ private FileOutputStream fileOutputStream;
+
+ /**
+ * channel is the FileChannel of the lock file.
+ */
+ private FileChannel channel;
+
+ /**
+ * fileLock NIO FileLock Object
+ */
+ private FileLock fileLock;
+
+ /**
+ * lock file
+ */
+ private String lockFile;
+
+ public static final String tmpPath;
+
+ private String tableName;
+
+ private String databaseName;
+
+ /**
+ * LOGGER for logging the messages.
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LocalFileLock.class.getName());
+
+ static {
+ tmpPath = System.getProperty("java.io.tmpdir");
+ }
+
+ /**
++ * @param lockFileLocation
++ * @param lockFile
++ */
++ public LocalFileLock(String lockFileLocation, String lockFile) {
++ this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation;
++ this.lockFile = lockFile;
++ initRetry();
++ }
++
++ /**
+ * @param tableIdentifier
+ * @param lockFile
+ */
+ public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
- this.location =
- tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
- + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName();
- this.lockFile = lockFile;
++ this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
++ .getTableName(), lockFile);
+ initRetry();
+ }
+
+ /**
+ * Lock API for locking of the file channel of the lock file.
+ *
+ * @return
+ */
+ @Override public boolean lock() {
+ try {
+ if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) {
+ FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
+ }
+ String lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
+ lockFile;
+ if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
+ FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
+ }
+
+ fileOutputStream = new FileOutputStream(lockFilePath);
+ channel = fileOutputStream.getChannel();
+ try {
+ fileLock = channel.tryLock();
+ } catch (OverlappingFileLockException e) {
+ return false;
+ }
+ if (null != fileLock) {
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+
+ }
+
+ /**
+ * Unlock API for unlocking of the acquired lock.
+ *
+ * @return
+ */
+ @Override public boolean unlock() {
+ boolean status;
+ try {
+ if (null != fileLock) {
+ fileLock.release();
+ }
+ status = true;
+ } catch (IOException e) {
+ status = false;
+ } finally {
+ if (null != fileOutputStream) {
+ try {
+ fileOutputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+ return status;
+ }
+
+}