You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/19 02:16:50 UTC
[1/4] incubator-carbondata git commit: Improved spark module code. *
Removed some compliation warnings. * Replace pattern matching for boolean to
IF-ELSE. * Improved code according to scala standards. * Removed unnecessary
new lines. * Added string inter
Repository: incubator-carbondata
Updated Branches:
refs/heads/master c5176f31e -> 0a8e782ff
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 0a35b21..89e1aa9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -40,20 +40,18 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
- val values = carbonRowInstance.getValues().toSeq.map { value =>
- value match {
- case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
- case d: java.math.BigDecimal =>
- val javaDecVal = new java.math.BigDecimal(d.toString())
- val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
- val decConverter = new org.apache.spark.sql.types.Decimal()
- decConverter.set(scalaDecVal)
- case _ => value
- }
+ val values = carbonRowInstance.getValues.toSeq.map {
+ case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+ case d: java.math.BigDecimal =>
+ val javaDecVal = new java.math.BigDecimal(d.toString)
+ val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
+ val decConverter = new org.apache.spark.sql.types.Decimal()
+ decConverter.set(scalaDecVal)
+ case value => value
}
try {
val result = evaluateExpression(
- new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+ new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
val sparkRes = if (isExecutor) {
result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
} else {
@@ -62,17 +60,16 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
sparkRes
)
- }
- catch {
- case e: Exception => throw new FilterUnsupportedException(e.getMessage())
+ } catch {
+ case e: Exception => throw new FilterUnsupportedException(e.getMessage)
}
}
- override def getFilterExpressionType(): ExpressionType = {
+ override def getFilterExpressionType: ExpressionType = {
ExpressionType.UNKNOWN
}
- override def getString(): String = {
+ override def getString: String = {
sparkExp.toString()
}
@@ -81,46 +78,45 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
isExecutor = true
}
- def getColumnList(): java.util.List[ColumnExpression] = {
+ def getColumnList: java.util.List[ColumnExpression] = {
val lst = new java.util.ArrayList[ColumnExpression]()
getColumnListFromExpressionTree(sparkExp, lst)
lst
}
- def getLiterals(): java.util.List[ExpressionResult] = {
+ def getLiterals: java.util.List[ExpressionResult] = {
val lst = new java.util.ArrayList[ExpressionResult]()
lst
}
- def getAllColumnList(): java.util.List[ColumnExpression] = {
+ def getAllColumnList: java.util.List[ColumnExpression] = {
val lst = new java.util.ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
lst
}
- def isSingleDimension(): Boolean = {
+ def isSingleDimension: Boolean = {
val lst = new java.util.ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
if (lst.size == 1 && lst.get(0).isDimension) {
true
- }
- else {
+ } else {
false
}
}
def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
- list: java.util.List[ColumnExpression]): Unit = {
+ list: java.util.List[ColumnExpression]): Unit = {
sparkCurrentExp match {
case carbonBoundRef: CarbonBoundReference =>
val foundExp = list.asScala
- .find(p => p.getColumnName() == carbonBoundRef.colExp.getColumnName())
+ .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName)
if (foundExp.isEmpty) {
carbonBoundRef.colExp.setColIndex(list.size)
list.add(carbonBoundRef.colExp)
} else {
- carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex())
+ carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex)
}
case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
}
@@ -128,7 +124,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
- list: List[ColumnExpression]): List[ColumnExpression] = {
+ list: List[ColumnExpression]): List[ColumnExpression] = {
sparkCurrentExp match {
case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
@@ -136,13 +132,12 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
list
}
- def isDirectDictionaryColumns(): Boolean = {
+ def isDirectDictionaryColumns: Boolean = {
val lst = new ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
true
- }
- else {
+ } else {
false
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ed757e3..a6b4ec5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -45,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
+ColumnSchema}
import org.apache.carbondata.core.carbon.path.CarbonStorePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
@@ -61,7 +62,8 @@ import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
+GlobalDictionaryUtil}
case class tableModel(
ifNotExistsSet: Boolean,
@@ -166,18 +168,24 @@ case class NodeInfo(TaskId: String, noOfBlocks: Int)
case class AlterTableModel(dbName: Option[String], tableName: String,
- compactionType: String, alterSql: String)
+ compactionType: String, alterSql: String)
case class CompactionModel(compactionSize: Long,
- compactionType: CompactionType,
- carbonTable: CarbonTable,
- tableCreationTime: Long,
- isDDLTrigger: Boolean)
+ compactionType: CompactionType,
+ carbonTable: CarbonTable,
+ tableCreationTime: Long,
+ isDDLTrigger: Boolean)
-case class CompactionCallableModel(storePath: String, carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
- cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
- compactionType: CompactionType)
+case class CompactionCallableModel(storePath: String,
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storeLocation: String,
+ carbonTable: CarbonTable,
+ kettleHomePath: String,
+ cubeCreationTime: Long,
+ loadsToMerge: util.List[LoadMetadataDetails],
+ sqlContext: SQLContext,
+ compactionType: CompactionType)
object TableNewProcessor {
def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
@@ -189,6 +197,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
var index = 0
var rowGroup = 0
+
def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
fieldChildren.foreach(fields => {
@@ -294,12 +303,12 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
// Its based on the dimension name and measure name
allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Duplicate column found with name : $name")
+ LOGGER.error(s"Duplicate column found with name: $name")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName}" +
- s"Duplicate column found with name : $name")
- sys.error(s"Duplicate dimensions found with name : $name")
+ s"for ${ cm.databaseName }.${ cm.tableName }" +
+ s"Duplicate column found with name: $name")
+ sys.error(s"Duplicate dimensions found with name: $name")
})
val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
@@ -314,14 +323,11 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
for (column <- allColumns) {
if (highCardinalityDims.contains(column.getColumnName)) {
newOrderedDims += column
- }
- else if (column.isComplex) {
+ } else if (column.isComplex) {
complexDims += column
- }
- else if (column.isDimensionColumn) {
+ } else if (column.isDimensionColumn) {
newOrderedDims += column
- }
- else {
+ } else {
measures += column
}
@@ -333,7 +339,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
// When the column is measure or the specified no inverted index column in DDL,
// set useInvertedIndex to false, otherwise true.
if (noInvertedIndexCols.contains(column.getColumnName) ||
- cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
+ cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
column.setUseInvertedIndex(false)
} else {
column.setUseInvertedIndex(true)
@@ -378,25 +384,22 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
Partitioner(
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
Array(""), part.partitionCount, null)
- }
- else {
+ } else {
// case where partition cols are set and partition class is not set.
// so setting the default value.
Partitioner(
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
part.partitionColumn, part.partitionCount, null)
}
- }
- else if (definedpartCols.nonEmpty) {
+ } else if (definedpartCols.nonEmpty) {
val msg = definedpartCols.mkString(", ")
- LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+ LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation for " +
- s"${cm.databaseName}.${cm.tableName} " +
- s"partition columns specified are not part of Dimension columns : $msg")
- sys.error(s"partition columns specified are not part of Dimension columns : $msg")
- }
- else {
+ s"${ cm.databaseName }.${ cm.tableName } " +
+ s"partition columns specified are not part of Dimension columns: $msg")
+ sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+ } else {
try {
Class.forName(part.partitionClass).newInstance()
@@ -405,9 +408,9 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
val cl = part.partitionClass
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation for " +
- s"${cm.databaseName}.${cm.tableName} " +
- s"partition class specified can not be found or loaded : $cl")
- sys.error(s"partition class specified can not be found or loaded : $cl")
+ s"${ cm.databaseName }.${ cm.tableName } " +
+ s"partition class specified can not be found or loaded: $cl")
+ sys.error(s"partition class specified can not be found or loaded: $cl")
}
Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -578,42 +581,42 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
// Its based on the dimension name and measure name
levels.groupBy(_.name).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Duplicate dimensions found with name : $name")
+ LOGGER.error(s"Duplicate dimensions found with name: $name")
LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName} " +
- s"Duplicate dimensions found with name : $name")
- sys.error(s"Duplicate dimensions found with name : $name")
+ "Validation failed for Create/Alter Table Operation " +
+ s"for ${ cm.databaseName }.${ cm.tableName } " +
+ s"Duplicate dimensions found with name: $name")
+ sys.error(s"Duplicate dimensions found with name: $name")
})
levels.groupBy(_.column).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Duplicate dimensions found with column name : $name")
+ LOGGER.error(s"Duplicate dimensions found with column name: $name")
LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName} " +
- s"Duplicate dimensions found with column name : $name")
- sys.error(s"Duplicate dimensions found with column name : $name")
+ "Validation failed for Create/Alter Table Operation " +
+ s"for ${ cm.databaseName }.${ cm.tableName } " +
+ s"Duplicate dimensions found with column name: $name")
+ sys.error(s"Duplicate dimensions found with column name: $name")
})
measures.groupBy(_.name).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Duplicate measures found with name : $name")
+ LOGGER.error(s"Duplicate measures found with name: $name")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName} " +
- s"Duplicate measures found with name : $name")
- sys.error(s"Duplicate measures found with name : $name")
+ s"for ${ cm.databaseName }.${ cm.tableName } " +
+ s"Duplicate measures found with name: $name")
+ sys.error(s"Duplicate measures found with name: $name")
})
measures.groupBy(_.column).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Duplicate measures found with column name : $name")
+ LOGGER.error(s"Duplicate measures found with column name: $name")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName} " +
- s"Duplicate measures found with column name : $name")
- sys.error(s"Duplicate measures found with column name : $name")
+ s"for ${ cm.databaseName }.${ cm.tableName } " +
+ s"Duplicate measures found with column name: $name")
+ sys.error(s"Duplicate measures found with column name: $name")
})
val levelsArray = levels.map(_.name)
@@ -625,7 +628,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
LOGGER.error(s"Aggregator should not be defined for dimension fields [$fault]")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation for " +
- s"${cm.databaseName}.${cm.tableName} " +
+ s"${ cm.databaseName }.${ cm.tableName } " +
s"Aggregator should not be defined for dimension fields [$fault]")
sys.error(s"Aggregator should not be defined for dimension fields [$fault]")
}
@@ -633,12 +636,12 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
levelsNdMesures.groupBy(x => x).foreach(f => if (f._2.size > 1) {
val name = f._1
- LOGGER.error(s"Dimension and Measure defined with same name : $name")
+ LOGGER.error(s"Dimension and Measure defined with same name: $name")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation " +
- s"for ${cm.databaseName}.${cm.tableName} " +
- s"Dimension and Measure defined with same name : $name")
- sys.error(s"Dimension and Measure defined with same name : $name")
+ s"for ${ cm.databaseName }.${ cm.tableName } " +
+ s"Dimension and Measure defined with same name: $name")
+ sys.error(s"Dimension and Measure defined with same name: $name")
})
dimSrcDimensions.foreach(d => {
@@ -677,8 +680,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
val matchedMapping = aggs.filter(agg => f.name.equals(agg.msrName))
if (matchedMapping.isEmpty) {
f
- }
- else {
+ } else {
Measure(f.name, f.column, f.dataType, matchedMapping.head.aggType)
}
}
@@ -708,17 +710,14 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
Partitioner(
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
Array(""), part.partitionCount, null)
- }
- else if (definedpartCols.nonEmpty) {
+ } else if (definedpartCols.nonEmpty) {
val msg = definedpartCols.mkString(", ")
- LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+ LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation - " +
- s"partition columns specified are not part of Dimension columns : $msg")
- sys.error(s"partition columns specified are not part of Dimension columns : $msg")
- }
- else {
-
+ s"partition columns specified are not part of Dimension columns: $msg")
+ sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+ } else {
try {
Class.forName(part.partitionClass).newInstance()
} catch {
@@ -726,9 +725,9 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
val cl = part.partitionClass
LOGGER.audit(
s"Validation failed for Create/Alter Table Operation for " +
- s"${cm.databaseName}.${cm.tableName} " +
- s"partition class specified can not be found or loaded : $cl")
- sys.error(s"partition class specified can not be found or loaded : $cl")
+ s"${ cm.databaseName }.${ cm.tableName } " +
+ s"partition class specified can not be found or loaded: $cl")
+ sys.error(s"partition class specified can not be found or loaded: $cl")
}
Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -782,8 +781,8 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
.getCarbonTable(databaseName + "_" + tableName)) {
- logError("alter table failed. table not found: " + databaseName + "." + tableName)
- sys.error("alter table failed. table not found: " + databaseName + "." + tableName)
+ logError(s"alter table failed. table not found: $databaseName.$tableName")
+ sys.error(s"alter table failed. table not found: $databaseName.$tableName")
}
val relation =
@@ -824,17 +823,14 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
kettleHomePath,
storeLocation
)
- }
- catch {
+ } catch {
case e: Exception =>
if (null != e.getMessage) {
- sys.error("Compaction failed. Please check logs for more info." + e.getMessage)
- }
- else {
+ sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+ } else {
sys.error("Exception in compaction. Please check logs for more info.")
}
}
-
Seq.empty
}
}
@@ -861,9 +857,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
s"Table [$tbName] already exists under database [$dbName]")
sys.error(s"Table [$tbName] already exists under database [$dbName]")
}
- }
- else {
-
+ } else {
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
// Need to fill partitioner class when we support partition
@@ -872,7 +866,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
sqlContext.sql(
s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
- .collect
+ .collect
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -882,7 +876,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
.dropTable(catalog.storePath, identifier)(sqlContext)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
+ s"and Table name [$tbName] failed")
throw e
}
@@ -940,8 +934,8 @@ private[sql] case class DeleteLoadsById(
LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
}
else {
- sys.error("Delete segment by Id is failed. Invalid ID is :"
- + invalidLoadIds.mkString(","))
+ sys.error("Delete segment by Id is failed. Invalid ID is:" +
+ s" ${ invalidLoadIds.mkString(",") }")
}
} catch {
case ex: Exception =>
@@ -963,10 +957,10 @@ private[sql] case class DeleteLoadsById(
}
private[sql] case class DeleteLoadsByLoadDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- loadDate: String) extends RunnableCommand {
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
@@ -980,12 +974,12 @@ private[sql] case class DeleteLoadsByLoadDate(
if (relation == null) {
LOGGER
.audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
- s"exist")
+ s"exist")
sys.error(s"Table $dbName.$tableName does not exist")
}
val timeObj = Cast(Literal(loadDate), TimestampType).eval()
- if(null == timeObj) {
+ if (null == timeObj) {
val errorMessage = "Error: Invalid load start time format " + loadDate
throw new MalformedCarbonCommandException(errorMessage)
}
@@ -1037,20 +1031,20 @@ case class LoadTable(
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
val identifier = TableIdentifier(tableName, Option(dbName))
if (isOverwriteExist) {
- sys.error("Overwrite is not supported for carbon table with " + dbName + "." + tableName)
+ sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
}
if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
.getCarbonTable(dbName + "_" + tableName)) {
- logError("Data loading failed. table not found: " + dbName + "." + tableName)
- LOGGER.audit("Data loading failed. table not found: " + dbName + "." + tableName)
- sys.error("Data loading failed. table not found: " + dbName + "." + tableName)
+ logError(s"Data loading failed. table not found: $dbName.$tableName")
+ LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+ sys.error(s"Data loading failed. table not found: $dbName.$tableName")
}
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation1(Option(dbName), tableName)(sqlContext)
- .asInstanceOf[CarbonRelation]
+ .lookupRelation1(Option(dbName), tableName)(sqlContext)
+ .asInstanceOf[CarbonRelation]
if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
}
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val carbonLock = CarbonLockFactory
@@ -1066,8 +1060,13 @@ case class LoadTable(
sys.error("Table is locked for updation. Please try after some time")
}
- val factPath = if (dataFrame.isDefined) "" else FileUtils.getPaths(
- CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+ val factPath = if (dataFrame.isDefined) {
+ ""
+ }
+ else {
+ FileUtils.getPaths(
+ CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+ }
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
@@ -1127,7 +1126,8 @@ case class LoadTable(
case "false" => false
case illegal =>
val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
- "load DDL which you set can only be 'true' or 'false', please check your input DDL."
+ "load DDL which you set can only be 'true' or 'false', please check " +
+ "your input DDL."
throw new MalformedCarbonCommandException(errorMessage)
}
val maxColumns = options.getOrElse("maxcolumns", null)
@@ -1165,8 +1165,7 @@ case class LoadTable(
// First system has to partition the data first and then call the load data
if (null == relation.tableMeta.partitioner.partitionColumn ||
relation.tableMeta.partitioner.partitionColumn(0).isEmpty) {
- LOGGER.info("Initiating Direct Load for the Table : (" +
- dbName + "." + tableName + ")")
+ LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
carbonLoadModel.setFactFilePath(factPath)
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
carbonLoadModel.setCsvHeader(fileHeader)
@@ -1185,14 +1184,12 @@ case class LoadTable(
partitionStatus,
useKettle,
dataFrame)
- }
- catch {
+ } catch {
case ex: Exception =>
LOGGER.error(ex)
LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
throw ex
- }
- finally {
+ } finally {
// Once the data load is successful delete the unwanted partition files
try {
val fileType = FileFactory.getFileType(partitionLocation)
@@ -1205,7 +1202,7 @@ case class LoadTable(
case ex: Exception =>
LOGGER.error(ex)
LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
- "Problem deleting the partition folder")
+ "Problem deleting the partition folder")
throw ex
}
@@ -1229,12 +1226,12 @@ case class LoadTable(
Seq.empty
}
- private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
+ private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
val dimensions = table.getDimensionByTableName(tableName).asScala
if (dateFormat != null) {
if (dateFormat.trim == "") {
throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
- "string.")
+ "string.")
} else {
var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
for (singleDateFormat <- dateFormats) {
@@ -1242,11 +1239,13 @@ case class LoadTable(
val columnName = dateFormatSplits(0).trim.toLowerCase
if (!dimensions.exists(_.getColName.equals(columnName))) {
throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
- dateFormatSplits(0) + " is provided in Option DateFormat.")
+ dateFormatSplits(0) +
+ " is provided in Option DateFormat.")
}
if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
- "for " + "Column " + dateFormatSplits(0) + ".")
+ "for " + "Column " + dateFormatSplits(0) +
+ ".")
}
}
}
@@ -1279,8 +1278,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, identifier)(sqlContext)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
- }
- finally {
+ } finally {
if (carbonLock != null && isLocked) {
if (carbonLock.unlock()) {
logInfo("Table MetaData Unlocked Successfully after dropping the table")
@@ -1293,7 +1291,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
}
// delete bad record log after drop table
- val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
+ val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
val badLogFileType = FileFactory.getFileType(badLogPath)
if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
@@ -1353,8 +1351,7 @@ private[sql] case class ShowLoads(
try {
val lim = Integer.parseInt(limitLoads)
loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
- }
- catch {
+ } catch {
case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
}
@@ -1389,13 +1386,13 @@ private[sql] case class DescribeCommandFormatted(
var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
val comment = if (relation.metaData.dims.contains(field.name)) {
val dimension = relation.metaData.carbonTable.getDimensionByName(
- relation.tableMeta.carbonTableIdentifier.getTableName,
- field.name)
+ relation.tableMeta.carbonTableIdentifier.getTableName,
+ field.name)
if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
colProps.append(field.name).append(".")
- .append(mapper.writeValueAsString(dimension.getColumnProperties))
- .append(",")
+ .append(mapper.writeValueAsString(dimension.getColumnProperties))
+ .append(",")
}
if (dimension.hasEncoding(Encoding.DICTIONARY) &&
!dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -1415,11 +1412,11 @@ private[sql] case class DescribeCommandFormatted(
colProps.toString()
}
results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
- results ++= Seq(("Database Name : ", relation.tableMeta.carbonTableIdentifier
+ results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
.getDatabaseName, "")
)
- results ++= Seq(("Table Name : ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
- results ++= Seq(("CARBON Store Path : ", relation.tableMeta.storePath, ""))
+ results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+ results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
val carbonTable = relation.tableMeta.carbonTable
results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
@@ -1438,7 +1435,7 @@ private[sql] case class DescribeCommandFormatted(
private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
var results: Seq[(String, String, String)] =
- Seq(("", "", ""), ("##Column Group Information", "", ""))
+ Seq(("", "", ""), ("##Column Group Information", "", ""))
val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
case (groupId, _) => groupId != -1
}.toSeq.sortBy(_._1)
@@ -1447,7 +1444,7 @@ private[sql] case class DescribeCommandFormatted(
})
var index = 1
groups.map { x =>
- results = results:+(s"Column Group $index", x, "")
+ results = results :+ (s"Column Group $index", x, "")
index = index + 1
}
results
@@ -1464,7 +1461,6 @@ private[sql] case class DeleteLoadByDate(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
-
val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
val identifier = TableIdentifier(tableName, Option(dbName))
@@ -1472,26 +1468,21 @@ private[sql] case class DeleteLoadByDate(
.lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
var level: String = ""
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
- .getInstance().getCarbonTable(dbName + '_' + tableName)
+ .getInstance().getCarbonTable(dbName + '_' + tableName)
if (relation == null) {
LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
sys.error(s"Table $dbName.$tableName does not exist")
}
-
val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
filter => filter.name.equalsIgnoreCase(dateField) &&
filter.dataType.isInstanceOf[TimestampType]).toList
-
if (matches.isEmpty) {
- LOGGER.audit(
- "The delete load by date is failed. " +
- "Table $dbName.$tableName does not contain date field :" + dateField)
- sys.error(s"Table $dbName.$tableName does not contain date field " + dateField)
- }
- else {
+ LOGGER.audit("The delete load by date is failed. " +
+ s"Table $dbName.$tableName does not contain date field: $dateField")
+ sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
+ } else {
level = matches.asJava.get(0).name
}
-
val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
.getColName
CarbonDataRDDFactory.deleteLoadByDate(
@@ -1507,6 +1498,7 @@ private[sql] case class DeleteLoadByDate(
LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
Seq.empty
}
+
}
private[sql] case class CleanFiles(
@@ -1544,7 +1536,7 @@ private[sql] case class CleanFiles(
relation.tableMeta.partitioner)
LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
} catch {
- case ex : Exception =>
+ case ex: Exception =>
sys.error(ex.getMessage)
}
Seq.empty
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index d551e10..3fe62cc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -55,9 +55,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
case class CarbonMetaData(dims: Seq[String],
- msrs: Seq[String],
- carbonTable: CarbonTable,
- dictionaryMap: DictionaryMap)
+ msrs: Seq[String],
+ carbonTable: CarbonTable,
+ dictionaryMap: DictionaryMap)
case class TableMeta(carbonTableIdentifier: CarbonTableIdentifier, storePath: String,
var carbonTable: CarbonTable, partitioner: Partitioner)
@@ -176,12 +176,12 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
ZookeeperInit.getInstance(zookeeperUrl)
LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
var configuredLockType = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE)
+ .getProperty(CarbonCommonConstants.LOCK_TYPE)
if (null == configuredLockType) {
configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- configuredLockType)
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ configuredLockType)
}
}
@@ -214,7 +214,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
tableFolders.foreach(tableFolder => {
if (tableFolder.isDirectory) {
val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
+ tableFolder.getName, UUID.randomUUID().toString)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
carbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
@@ -260,22 +260,17 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
})
}
})
- }
- else {
+ } else {
// Create folders and files.
FileFactory.mkdirs(databasePath, fileType)
-
}
- }
- catch {
+ } catch {
case s: java.io.FileNotFoundException =>
// Create folders and files.
FileFactory.mkdirs(databasePath, fileType)
-
}
}
-
/**
*
* Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
@@ -286,11 +281,9 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
dbName: String, tableName: String, partitioner: Partitioner)
(sqlContext: SQLContext): String = {
-
if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
sys.error(s"Table [$tableName] already exists under Database [$dbName]")
}
-
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
@@ -299,14 +292,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
.add(schemaEvolutionEntry)
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
+ tableInfo.getFactTable.getTableId)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
tableInfo.setStorePath(storePath)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-
val tableMeta = TableMeta(
carbonTableIdentifier,
storePath,
@@ -318,15 +310,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
}
-
val thriftWriter = new ThriftWriter(schemaFilePath, false)
thriftWriter.open()
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
-
metadata.tablesMeta += tableMeta
logInfo(s"Table $tableName for Database $dbName created successfully.")
- LOGGER.info("Table " + tableName + " for Database " + dbName + " created successfully.")
+ LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
carbonTablePath.getPath
}
@@ -392,8 +382,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
if (c.carbonTableIdentifier.getDatabaseName.contains(name)) {
c.carbonTableIdentifier
.getDatabaseName
- }
- else {
+ } else {
null
}
case _ => c.carbonTableIdentifier.getDatabaseName
@@ -420,8 +409,8 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
def getAllTables()(sqlContext: SQLContext): Seq[TableIdentifier] = {
checkSchemasModifiedTimeAndReloadTables()
metadata.tablesMeta.map { c =>
- TableIdentifier(c.carbonTableIdentifier.getTableName,
- Some(c.carbonTableIdentifier.getDatabaseName))
+ TableIdentifier(c.carbonTableIdentifier.getTableName,
+ Some(c.carbonTableIdentifier.getDatabaseName))
}
}
@@ -526,7 +515,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
- tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+ tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
refreshCache()
}
}
@@ -636,18 +625,18 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
object CarbonMetastoreTypes extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"string" ^^^ StringType |
- "float" ^^^ FloatType |
- "int" ^^^ IntegerType |
- "tinyint" ^^^ ShortType |
- "short" ^^^ ShortType |
- "double" ^^^ DoubleType |
- "long" ^^^ LongType |
- "binary" ^^^ BinaryType |
- "boolean" ^^^ BooleanType |
- fixedDecimalType |
- "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
- "varchar\\((\\d+)\\)".r ^^^ StringType |
- "timestamp" ^^^ TimestampType
+ "float" ^^^ FloatType |
+ "int" ^^^ IntegerType |
+ "tinyint" ^^^ ShortType |
+ "short" ^^^ ShortType |
+ "double" ^^^ DoubleType |
+ "long" ^^^ LongType |
+ "binary" ^^^ BinaryType |
+ "boolean" ^^^ BooleanType |
+ fixedDecimalType |
+ "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+ "varchar\\((\\d+)\\)".r ^^^ StringType |
+ "timestamp" ^^^ TimestampType
protected lazy val fixedDecimalType: Parser[DataType] =
"decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 81abbfb..0c13293 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
object DistributionUtil {
@transient
val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
+
/*
* This method will return the list of executers in the cluster.
* For this we take the memory status of all node with getExecutorMemoryStatus
@@ -62,13 +63,11 @@ object DistributionUtil {
addr.getHostName
}
nodeNames.toArray
- }
- else {
+ } else {
// For Standalone cluster, node IPs will be returned.
nodelist.toArray
}
- }
- else {
+ } else {
Seq(InetAddress.getLocalHost.getHostName).toArray
}
}
@@ -111,37 +110,41 @@ object DistributionUtil {
* @return
*/
def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
- sparkContext: SparkContext):
+ sparkContext: SparkContext):
Array[String] = {
val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
var confExecutorsTemp: String = null
if (sparkContext.getConf.contains("spark.executor.instances")) {
confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
- && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
+ && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+ .equalsIgnoreCase("true")) {
if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
}
}
- val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+ val confExecutors = if (null != confExecutorsTemp) {
+ confExecutorsTemp.toInt
+ } else {
+ 1
+ }
val requiredExecutors = if (nodeMapping.size > confExecutors) {
confExecutors
- } else {nodeMapping.size()}
+ } else { nodeMapping.size() }
- val startTime = System.currentTimeMillis();
+ val startTime = System.currentTimeMillis()
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
var nodes = DistributionUtil.getNodeList(sparkContext)
- var maxTimes = 30;
+ var maxTimes = 30
while (nodes.length < requiredExecutors && maxTimes > 0) {
- Thread.sleep(500);
+ Thread.sleep(500)
nodes = DistributionUtil.getNodeList(sparkContext)
- maxTimes = maxTimes - 1;
+ maxTimes = maxTimes - 1
}
- val timDiff = System.currentTimeMillis() - startTime;
- LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
- LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) * 500)
+ val timDiff = System.currentTimeMillis() - startTime
+ LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
+ LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
nodes.distinct
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 368a1ad..d60bed4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -75,7 +75,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
def apply(plan: LogicalPlan): LogicalPlan = {
if (relations.nonEmpty && !isOptimized(plan)) {
LOGGER.info("Starting to optimize plan")
- val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("");
+ val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
val queryStatistic = new QueryStatistic()
val result = transformCarbonPlan(plan, relations)
queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
@@ -99,8 +99,8 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
def fillNodeInfo(
- plan: LogicalPlan,
- extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
+ plan: LogicalPlan,
+ extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
plan match {
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
val extraNodeInfo = ExtraNodeInfo(true)
@@ -465,9 +465,10 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
decoder = true
cd
case currentPlan =>
- hasCarbonRelation(currentPlan) match {
- case true => addTempDecoder(currentPlan)
- case false => currentPlan
+ if (hasCarbonRelation(currentPlan)) {
+ addTempDecoder(currentPlan)
+ } else {
+ currentPlan
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index b683629..e755b2e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -29,19 +29,18 @@ object FileUtils extends Logging {
* append all csv file path to a String, file path separated by comma
*/
private def getPathsFromCarbonFile(carbonFile: CarbonFile, stringBuild: StringBuilder): Unit = {
- carbonFile.isDirectory match {
- case true =>
+ if (carbonFile.isDirectory) {
val files = carbonFile.listFiles()
for (j <- 0 until files.size) {
getPathsFromCarbonFile(files(j), stringBuild)
}
- case false =>
+ } else {
val path = carbonFile.getAbsolutePath
val fileName = carbonFile.getName
if (carbonFile.getSize == 0) {
logWarning(s"skip empty input file: $path")
} else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
- fileName.startsWith(CarbonCommonConstants.POINT)) {
+ fileName.startsWith(CarbonCommonConstants.POINT)) {
logWarning(s"skip invisible input file: $path")
} else {
stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
@@ -71,7 +70,7 @@ object FileUtils extends Logging {
stringBuild.substring(0, stringBuild.size - 1)
} else {
throw new DataLoadingException("Please check your input path and make sure " +
- "that files end with '.csv' and content is not empty.")
+ "that files end with '.csv' and content is not empty.")
}
}
}
@@ -90,4 +89,5 @@ object FileUtils extends Logging {
size
}
}
+
}
[2/4] incubator-carbondata git commit: Improved spark module code. *
Removed some compliation warnings. * Replace pattern matching for boolean to
IF-ELSE. * Improved code according to scala standards. * Removed unnecessary
new lines. * Added string inter
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index b8f4087..1a7aaf6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -32,13 +32,15 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TableTaskInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties,
+TableBlockInfo, TableTaskInfo, TaskBlockInfo}
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.carbon.path.CarbonTablePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor,
+CarbonCompactionUtil, RowResultMerger}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.scan.result.iterator.RawResultIterator
@@ -49,11 +51,11 @@ import org.apache.carbondata.spark.util.QueryPlanUtil
class CarbonMergerRDD[K, V](
- sc: SparkContext,
- result: MergeResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- carbonMergerMapping : CarbonMergerMapping,
- confExecutorsTemp: String)
+ sc: SparkContext,
+ result: MergeResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ carbonMergerMapping: CarbonMergerMapping,
+ confExecutorsTemp: String)
extends RDD[(K, V)](sc, Nil) with Logging {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -66,37 +68,37 @@ class CarbonMergerRDD[K, V](
val databaseName = carbonMergerMapping.databaseName
val factTableName = carbonMergerMapping.factTableName
val tableId = carbonMergerMapping.tableId
+
override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val tempLocationKey: String = CarbonCommonConstants
- .COMPACTION_KEY_WORD + '_' + carbonLoadModel
- .getDatabaseName + '_' + carbonLoadModel
- .getTableName + '_' + carbonLoadModel.getTaskNo
+ .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+ .getDatabaseName + '_' + carbonLoadModel
+ .getTableName + '_' + carbonLoadModel.getTaskNo
// this property is used to determine whether temp location for carbon is inside
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
- if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.length > 0) {
+ if (null != storeLocations && storeLocations.nonEmpty) {
storeLocation = storeLocations(Random.nextInt(storeLocations.length))
}
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
- }
- else {
+ } else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
- LOGGER.info("Temp storeLocation taken is " + storeLocation)
+ LOGGER.info(s"Temp storeLocation taken is $storeLocation")
var mergeStatus = false
var mergeNumber = ""
var exec: CarbonCompactionExecutor = null
@@ -111,7 +113,7 @@ class CarbonMergerRDD[K, V](
carbonMergerMapping.maxSegmentColCardinality)
// sorting the table block info List.
- var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+ val tableBlockInfoList = carbonSparkPartition.tableBlockInfos
Collections.sort(tableBlockInfoList)
@@ -123,7 +125,7 @@ class CarbonMergerRDD[K, V](
carbonLoadModel.setStorePath(storePath)
- exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+ exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
dataFileMetadataSegMapping
)
@@ -135,18 +137,18 @@ class CarbonMergerRDD[K, V](
} catch {
case e: Throwable =>
if (null != exec) {
- exec.finish
+ exec.finish()
}
LOGGER.error(e)
if (null != e.getMessage) {
- sys.error("Exception occurred in query execution :: " + e.getMessage)
+ sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
} else {
sys.error("Exception occurred in query execution.Please check logs.")
}
}
mergeNumber = mergedLoadName
.substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
- CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+ CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
)
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
@@ -170,13 +172,11 @@ class CarbonMergerRDD[K, V](
)
mergeStatus = merger.mergerSlice()
- }
- catch {
+ } catch {
case e: Exception =>
LOGGER.error(e)
throw e
- }
- finally {
+ } finally {
// delete temp location data
val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
try {
@@ -187,9 +187,9 @@ class CarbonMergerRDD[K, V](
case e: Exception =>
LOGGER.error(e)
}
- if (null != exec) {
- exec.finish
- }
+ if (null != exec) {
+ exec.finish
+ }
}
var finished = false
@@ -198,8 +198,7 @@ class CarbonMergerRDD[K, V](
if (!finished) {
finished = true
finished
- }
- else {
+ } else {
!finished
}
}
@@ -261,7 +260,7 @@ class CarbonMergerRDD[K, V](
)
// keep on assigning till last one is reached.
- if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
+ if (null != blocksOfOneSegment && blocksOfOneSegment.nonEmpty) {
blocksOfLastSegment = blocksOfOneSegment.asJava
}
@@ -273,8 +272,7 @@ class CarbonMergerRDD[K, V](
val blockListTemp = new util.ArrayList[TableBlockInfo]()
blockListTemp.add(tableBlockInfo)
taskIdMapping.put(taskNo, blockListTemp)
- }
- else {
+ } else {
blockList.add(tableBlockInfo)
}
}
@@ -288,8 +286,7 @@ class CarbonMergerRDD[K, V](
}
// prepare the details required to extract the segment properties using last segment.
- if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0)
- {
+ if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0) {
val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1)
var dataFileFooter: DataFileFooter = null
@@ -317,8 +314,8 @@ class CarbonMergerRDD[K, V](
} else { nodeMapping.size() }
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
logInfo("No.of Executors required=" + requiredExecutors
- + " , spark.executor.instances=" + confExecutors
- + ", no.of.nodes where data present=" + nodeMapping.size())
+ + " , spark.executor.instances=" + confExecutors
+ + ", no.of.nodes where data present=" + nodeMapping.size())
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30
while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -341,11 +338,11 @@ class CarbonMergerRDD[K, V](
val list = new util.ArrayList[TableBlockInfo]
entry._2.asScala.foreach(taskInfo => {
- val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
- list.addAll(blocksPerNode.getTableBlockInfoList)
+ val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
+ list.addAll(blocksPerNode.getTableBlockInfoList)
taskBlockList
.add(new NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size))
- })
+ })
if (list.size() != 0) {
result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, list))
i += 1
@@ -354,25 +351,25 @@ class CarbonMergerRDD[K, V](
// print the node info along with task and number of blocks for the task.
- nodeTaskBlocksMap.asScala.foreach((entry : (String, List[NodeInfo])) => {
- logInfo(s"for the node ${entry._1}" )
+ nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
+ logInfo(s"for the node ${ entry._1 }")
for (elem <- entry._2.asScala) {
logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
}
- } )
+ })
val noOfNodes = nodes.length
val noOfTasks = result.size
logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
+ s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
)
- logInfo("Time taken to identify Blocks to scan : " + (System
- .currentTimeMillis() - startTime)
+ logInfo("Time taken to identify Blocks to scan: " + (System
+ .currentTimeMillis() - startTime)
)
- for (j <- 0 until result.size ) {
+ for (j <- 0 until result.size) {
val cp = result.get(j).asInstanceOf[CarbonSparkPartition]
- logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
- + ", No.Of Blocks : " + cp.tableBlockInfos.size
+ logInfo(s"Node: " + cp.locations.toSeq.mkString(",")
+ + ", No.Of Blocks: " + cp.tableBlockInfos.size
)
}
result.toArray(new Array[Partition](result.size))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index c496099..d56b00f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -49,7 +49,6 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.util.QueryPlanUtil
-
class CarbonSparkPartition(rddId: Int, val idx: Int,
val locations: Array[String],
val tableBlockInfos: util.List[TableBlockInfo])
@@ -92,7 +91,7 @@ class CarbonScanRDD[V: ClassTag](
val result = new util.ArrayList[Partition](defaultParallelism)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
- .getValidAndInvalidSegments
+ .getValidAndInvalidSegments
// set filter resolver tree
try {
// before applying filter check whether segments are available in the table.
@@ -110,11 +109,10 @@ class CarbonScanRDD[V: ClassTag](
queryModel.getAbsoluteTableIdentifier
)
}
- }
- catch {
+ } catch {
case e: Exception =>
LOGGER.error(e)
- sys.error("Exception occurred in query execution :: " + e.getMessage)
+ sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
}
// get splits
val splits = carbonInputFormat.getSplits(job)
@@ -129,8 +127,8 @@ class CarbonScanRDD[V: ClassTag](
)
)
var activeNodes = Array[String]()
- if(blockListTemp.nonEmpty) {
- activeNodes = DistributionUtil
+ if (blockListTemp.nonEmpty) {
+ activeNodes = DistributionUtil
.ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
}
defaultParallelism = sparkContext.defaultParallelism
@@ -141,9 +139,9 @@ class CarbonScanRDD[V: ClassTag](
var statistic = new QueryStatistic()
// group blocks to nodes, tasks
val nodeBlockMapping =
- CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
- activeNodes.toList.asJava
- )
+ CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
+ activeNodes.toList.asJava
+ )
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
statistic = new QueryStatistic()
@@ -173,15 +171,14 @@ class CarbonScanRDD[V: ClassTag](
statisticRecorder.logStatisticsAsTableDriver()
result.asScala.foreach { r =>
val cp = r.asInstanceOf[CarbonSparkPartition]
- logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
- + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+ logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
+ s", No.Of Blocks: ${ cp.tableBlockInfos.size() }"
)
}
} else {
logInfo("No blocks identified to scan")
}
- }
- else {
+ } else {
logInfo("No valid segments found to scan")
}
result.toArray(new Array[Partition](result.size()))
@@ -200,7 +197,7 @@ class CarbonScanRDD[V: ClassTag](
queryExecutor.finish
})
val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
- if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
+ if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
// fill table block info
queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
@@ -221,7 +218,7 @@ class CarbonScanRDD[V: ClassTag](
case e: Exception =>
LOGGER.error(e)
if (null != e.getMessage) {
- sys.error("Exception occurred in query execution :: " + e.getMessage)
+ sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
} else {
sys.error("Exception occurred in query execution.Please check logs.")
}
@@ -256,22 +253,22 @@ class CarbonScanRDD[V: ClassTag](
}
def logStatistics(): Unit = {
- if (null != queryModel.getStatisticsRecorder) {
- var queryStatistic = new QueryStatistic()
- queryStatistic
- .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
- System.currentTimeMillis - queryStartTime
- )
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // result size
- queryStatistic = new QueryStatistic()
- queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // print executor query statistics for each task_id
- queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
- }
+ if (null != queryModel.getStatisticsRecorder) {
+ var queryStatistic = new QueryStatistic()
+ queryStatistic
+ .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+ System.currentTimeMillis - queryStartTime
+ )
+ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
+ // result size
+ queryStatistic = new QueryStatistic()
+ queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+ queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
+ // print executor query statistics for each task_id
+ queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
}
}
+ }
iter
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 28c37f3..9c9be8d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -50,18 +50,18 @@ object Compactor {
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
- val startTime = System.nanoTime();
+ val startTime = System.nanoTime()
val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
var finalMergeStatus = false
val schemaName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
val validSegments: Array[String] = CarbonDataMergerUtil
.getValidSegments(loadsToMerge).split(',')
- val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime();
+ val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
val carbonMergerMapping = CarbonMergerMapping(storeLocation,
storePath,
partitioner,
- carbonTable.getMetaDataFilepath(),
+ carbonTable.getMetaDataFilepath,
mergedLoadName,
kettleHomePath,
cubeCreationTime,
@@ -82,19 +82,19 @@ object Compactor {
)
)
carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
- .readLoadMetadata(carbonTable.getMetaDataFilepath()).toList.asJava
+ .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
)
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
- logger.info("spark.executor.instances property is set to =" + execInstance)
+ logger.info(s"spark.executor.instances property is set to = $execInstance")
} // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
.equalsIgnoreCase("true")) {
execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
- logger.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
+ logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
}
}
@@ -106,51 +106,38 @@ object Compactor {
execInstance
).collect
- if(mergeStatus.length == 0) {
+ if (mergeStatus.length == 0) {
finalMergeStatus = false
- }
- else {
+ } else {
finalMergeStatus = mergeStatus.forall(_._2)
}
if (finalMergeStatus) {
- val endTime = System.nanoTime();
- logger.info("time taken to merge " + mergedLoadName + " is " + (endTime - startTime))
+ val endTime = System.nanoTime()
+ logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
if (!CarbonDataMergerUtil
- .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
+ .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
)) {
- logger
- .audit("Compaction request failed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- logger
- .error("Compaction request failed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- throw new Exception("Compaction failed to update metadata for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName)
- }
- else {
- logger
- .audit("Compaction request completed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- logger
- .info("Compaction request completed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ throw new Exception(s"Compaction failed to update metadata for table" +
+ s" ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ } else {
+ logger.audit(s"Compaction request completed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }")
}
- }
- else {
- logger
- .audit("Compaction request failed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- logger
- .error("Compaction request failed for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }"
+ )
+ logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }")
throw new Exception("Compaction Failure in Merger Rdd.")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
index d1b8bf3..e23b58d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
import org.apache.carbondata.common.factory.CarbonCommonFactory
import org.apache.carbondata.core.cache.dictionary.Dictionary
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
import org.apache.carbondata.spark.rdd.DictionaryLoadModel
@@ -63,8 +64,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
if (values.length >= 1) {
if (model.dictFileExists(columnIndex)) {
for (value <- values) {
- val parsedValue = org.apache.carbondata.core.util.DataTypeUtil
- .normalizeColumnValueForItsDataType(value,
+ val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
model.primDimensions(columnIndex))
if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
CarbonCommonConstants.INVALID_SURROGATE_KEY) {
@@ -75,8 +75,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
} else {
for (value <- values) {
- val parsedValue = org.apache.carbondata.core.util.DataTypeUtil
- .normalizeColumnValueForItsDataType(value,
+ val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
model.primDimensions(columnIndex))
if (null != parsedValue) {
writer.write(parsedValue)
@@ -88,8 +87,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
} catch {
case ex: IOException =>
throw ex
- }
- finally {
+ } finally {
if (null != writer) {
writer.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index 30d5871..efedc91 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -45,7 +45,7 @@ object CarbonThriftServer {
} catch {
case e: Exception =>
val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOG.error("Wrong value for carbon.spark.warmUpTime " + warmUpTime +
+ LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
"Using default Value and proceeding")
Thread.sleep(30000)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 8b89f5d..d5051cf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -36,7 +36,7 @@ object CommonUtil {
if (noDictionaryDims.contains(x)) {
throw new MalformedCarbonCommandException(
"Column group is not supported for no dictionary columns:" + x)
- } else if (msrs.filter { msr => msr.column.equals(x) }.nonEmpty) {
+ } else if (msrs.exists(msr => msr.column.equals(x))) {
// if column is measure
throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
} else if (foundIndExistingColGrp(x)) {
@@ -47,11 +47,11 @@ object CommonUtil {
} else if (isTimeStampColumn(x, dims)) {
throw new MalformedCarbonCommandException(
"Column group doesn't support Timestamp datatype:" + x)
- }
- // if invalid column is present
- else if (dims.filter { dim => dim.column.equalsIgnoreCase(x) }.isEmpty) {
+ }// if invalid column is
+ else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
+ // present
throw new MalformedCarbonCommandException(
- "column in column group is not a valid column :" + x
+ "column in column group is not a valid column: " + x
)
}
}
@@ -110,7 +110,7 @@ object CommonUtil {
key.length()), value))
}
}
- if (fieldProps.isEmpty()) {
+ if (fieldProps.isEmpty) {
None
} else {
Some(fieldProps)
@@ -211,7 +211,7 @@ object CommonUtil {
var tableBlockSize: Integer = 0
if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
val blockSizeStr: String =
- parsePropertyValueStringInMB(tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).get)
+ parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
try {
tableBlockSize = Integer.parseInt(blockSizeStr)
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 1fee428..aa8fcd5 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -35,7 +35,7 @@ object DataTypeConverterUtil {
case "timestamp" => DataType.TIMESTAMP
case "array" => DataType.ARRAY
case "struct" => DataType.STRUCT
- case _ => sys.error("Unsupported data type : " + dataType)
+ case _ => sys.error(s"Unsupported data type: $dataType")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index bd295bc..db01367 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -69,9 +69,9 @@ object GlobalDictionaryUtil extends Logging {
/**
* find columns which need to generate global dictionary.
*
- * @param dimensions dimension list of schema
- * @param headers column headers
- * @param columns column list of csv file
+ * @param dimensions dimension list of schema
+ * @param headers column headers
+ * @param columns column list of csv file
*/
def pruneDimensions(dimensions: Array[CarbonDimension],
headers: Array[String],
@@ -96,10 +96,10 @@ object GlobalDictionaryUtil extends Logging {
/**
* use this method to judge whether CarbonDimension use some encoding or not
- *
- * @param dimension carbonDimension
- * @param encoding the coding way of dimension
- * @param excludeEncoding the coding way to exclude
+ *
+ * @param dimension carbonDimension
+ * @param encoding the coding way of dimension
+ * @param excludeEncoding the coding way to exclude
*/
def hasEncoding(dimension: CarbonDimension,
encoding: Encoding,
@@ -129,7 +129,7 @@ object GlobalDictionaryUtil extends Logging {
if (dimension.hasEncoding(encoding) &&
(excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
- (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
+ (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
dimensionsWithEncoding += dimension
}
}
@@ -137,8 +137,8 @@ object GlobalDictionaryUtil extends Logging {
}
def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
- dimension: CarbonDimension,
- forPreDefDict: Boolean): Array[CarbonDimension] = {
+ dimension: CarbonDimension,
+ forPreDefDict: Boolean): Array[CarbonDimension] = {
val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
Encoding.DIRECT_DICTIONARY,
@@ -267,19 +267,20 @@ object GlobalDictionaryUtil extends Logging {
}
def isHighCardinalityColumn(columnCardinality: Int,
- rowCount: Long,
- model: DictionaryLoadModel): Boolean = {
- (columnCardinality > model.highCardThreshold) && (rowCount > 0) &&
- (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
+ rowCount: Long,
+ model: DictionaryLoadModel): Boolean = {
+ (columnCardinality > model.highCardThreshold) &&
+ (rowCount > 0) &&
+ (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
}
/**
* create a instance of DictionaryLoadModel
*
- * @param carbonLoadModel carbon load model
- * @param table CarbonTableIdentifier
- * @param dimensions column list
- * @param hdfsLocation store location in HDFS
+ * @param carbonLoadModel carbon load model
+ * @param table CarbonTableIdentifier
+ * @param dimensions column list
+ * @param hdfsLocation store location in HDFS
* @param dictfolderPath path of dictionary folder
*/
def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
@@ -300,7 +301,7 @@ object GlobalDictionaryUtil extends Logging {
val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
val primDimensions = primDimensionsBuffer.map { x => x }.toArray
val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
- getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+ getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
val dictFilePaths = dictDetail.dictFilePaths
val dictFileExists = dictDetail.dictFileExists
val columnIdentifier = dictDetail.columnIdentifiers
@@ -311,14 +312,14 @@ object GlobalDictionaryUtil extends Logging {
val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
// load high cardinality identify configure
val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
- CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
+ CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+ CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
val highCardThreshold = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
- CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
+ CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+ CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
val rowCountPercentage = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
- CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
+ CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+ CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
// get load count
if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -346,8 +347,8 @@ object GlobalDictionaryUtil extends Logging {
/**
* load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
*
- * @param sqlContext SQLContext
- * @param carbonLoadModel carbon data load model
+ * @param sqlContext SQLContext
+ * @param carbonLoadModel carbon data load model
*/
def loadDataFrame(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel): DataFrame = {
@@ -356,16 +357,14 @@ object GlobalDictionaryUtil extends Logging {
.option("header", {
if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
"true"
- }
- else {
+ } else {
"false"
}
})
.option("delimiter", {
if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
"" + DEFAULT_SEPARATOR
- }
- else {
+ } else {
carbonLoadModel.getCsvDelimiter
}
})
@@ -377,8 +376,7 @@ object GlobalDictionaryUtil extends Logging {
.option("quote", {
if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
"" + DEFAULT_QUOTE_CHARACTER
- }
- else {
+ } else {
carbonLoadModel.getQuoteChar
}
})
@@ -388,9 +386,9 @@ object GlobalDictionaryUtil extends Logging {
}
private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ noDictDimension: Array[CarbonDimension]): Unit = {
val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
model.table)
@@ -413,12 +411,11 @@ object GlobalDictionaryUtil extends Logging {
// update Metadata
val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+ model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
// update CarbonDataLoadSchema
val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext)
- .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+ model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
@@ -429,9 +426,9 @@ object GlobalDictionaryUtil extends Logging {
* @param status checking whether the generating is successful
*/
private def checkStatus(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- status: Array[(Int, String, Boolean)]) = {
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ status: Array[(Int, String, Boolean)]) = {
var result = false
val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
val tableName = model.table.getTableName
@@ -441,7 +438,9 @@ object GlobalDictionaryUtil extends Logging {
result = true
logError(s"table:$tableName column:$columnName generate global dictionary file failed")
}
- if (x._3) noDictionaryColumns += model.primDimensions(x._1)
+ if (x._3) {
+ noDictionaryColumns += model.primDimensions(x._1)
+ }
}
if (noDictionaryColumns.nonEmpty) {
updateTableMetadata(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
@@ -457,23 +456,23 @@ object GlobalDictionaryUtil extends Logging {
/**
* get external columns and whose dictionary file path
*
- * @param colDictFilePath external column dict file path
- * @param table table identifier
- * @param dimensions dimension columns
+ * @param colDictFilePath external column dict file path
+ * @param table table identifier
+ * @param dimensions dimension columns
*/
private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
- colDictFilePath: String,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension]) = {
+ colDictFilePath: String,
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension]) = {
val colFileMapArray = colDictFilePath.split(",")
for (colPathMap <- colFileMapArray) {
val colPathMapTrim = colPathMap.trim
val colNameWithPath = colPathMapTrim.split(":")
if (colNameWithPath.length == 1) {
logError("the format of external column dictionary should be " +
- "columnName:columnPath, please check")
+ "columnName:columnPath, please check")
throw new DataLoadingException("the format of predefined column dictionary" +
- " should be columnName:columnPath, please check")
+ " should be columnName:columnPath, please check")
}
setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
@@ -482,26 +481,25 @@ object GlobalDictionaryUtil extends Logging {
/**
* set pre defined dictionary for dimension
- *
- * @param dimensions all the dimensions
- * @param table carbon table identifier
- * @param colName user specified column name for predefined dict
- * @param colDictPath column dictionary file path
- * @param parentDimName parent dimenion for complex type
+ *
+ * @param dimensions all the dimensions
+ * @param table carbon table identifier
+ * @param colName user specified column name for predefined dict
+ * @param colDictPath column dictionary file path
+ * @param parentDimName parent dimenion for complex type
*/
def setPredefineDict(carbonLoadModel: CarbonLoadModel,
- dimensions: Array[CarbonDimension],
- table: CarbonTableIdentifier,
- colName: String,
- colDictPath: String,
- parentDimName: String = "") {
+ dimensions: Array[CarbonDimension],
+ table: CarbonTableIdentifier,
+ colName: String,
+ colDictPath: String,
+ parentDimName: String = "") {
val middleDimName = colName.split("\\.")(0)
val dimParent = parentDimName + {
colName match {
case "" => colName
case _ =>
- if (parentDimName.isEmpty) middleDimName
- else "." + middleDimName
+ if (parentDimName.isEmpty) middleDimName else "." + middleDimName
}
}
// judge whether the column is exists
@@ -509,9 +507,10 @@ object GlobalDictionaryUtil extends Logging {
_.getColName.equalsIgnoreCase(dimParent))
if (preDictDimensionOption.length == 0) {
logError(s"Column $dimParent is not a key column " +
- s"in ${table.getDatabaseName}.${table.getTableName}")
+ s"in ${ table.getDatabaseName }.${ table.getTableName }")
throw new DataLoadingException(s"Column $dimParent is not a key column. " +
- s"Only key column can be part of dictionary and used in COLUMNDICT option.")
+ s"Only key column can be part of dictionary " +
+ s"and used in COLUMNDICT option.")
}
val preDictDimension = preDictDimensionOption(0)
if (preDictDimension.isComplex) {
@@ -520,9 +519,11 @@ object GlobalDictionaryUtil extends Logging {
val currentColName = {
preDictDimension.getDataType match {
case DataType.ARRAY =>
- if (children(0).isComplex) "val." +
- colName.substring(middleDimName.length + 1)
- else "val"
+ if (children(0).isComplex) {
+ "val." + colName.substring(middleDimName.length + 1)
+ } else {
+ "val"
+ }
case _ => colName.substring(middleDimName.length + 1)
}
}
@@ -534,27 +535,27 @@ object GlobalDictionaryUtil extends Logging {
}
/**
- * use external dimension column to generate global dictionary
+ * use external dimension column to generate global dictionary
*
- * @param colDictFilePath external column dict file path
- * @param table table identifier
- * @param dimensions dimension column
- * @param carbonLoadModel carbon load model
- * @param sqlContext spark sql context
- * @param hdfsLocation store location on hdfs
+ * @param colDictFilePath external column dict file path
+ * @param table table identifier
+ * @param dimensions dimension column
+ * @param carbonLoadModel carbon load model
+ * @param sqlContext spark sql context
+ * @param hdfsLocation store location on hdfs
* @param dictFolderPath generated global dict file path
*/
private def generatePredefinedColDictionary(colDictFilePath: String,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension],
- carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- hdfsLocation: String,
- dictFolderPath: String) = {
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension],
+ carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ hdfsLocation: String,
+ dictFolderPath: String) = {
// set pre defined dictionary column
setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
- hdfsLocation, dictFolderPath, true)
+ hdfsLocation, dictFolderPath, forPreDefDict = true)
// new RDD to achieve distributed column dict generation
val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
@@ -600,7 +601,7 @@ object GlobalDictionaryUtil extends Logging {
* @param csvFileColumns
*/
private def parseRecord(x: String, accum: Accumulator[Int],
- csvFileColumns: Array[String]) : (String, String) = {
+ csvFileColumns: Array[String]): (String, String) = {
val tokens = x.split("" + DEFAULT_SEPARATOR)
var columnName: String = ""
var value: String = ""
@@ -610,7 +611,7 @@ object GlobalDictionaryUtil extends Logging {
accum += 1
} else if (tokens.size == 1) {
// such as "1", "jone", throw ex
- if (x.contains(",") == false) {
+ if (!x.contains(",")) {
accum += 1
} else {
try {
@@ -644,10 +645,10 @@ object GlobalDictionaryUtil extends Logging {
* @return allDictionaryRdd
*/
private def readAllDictionaryFiles(sqlContext: SQLContext,
- csvFileColumns: Array[String],
- requireColumns: Array[String],
- allDictionaryPath: String,
- accumulator: Accumulator[Int]) = {
+ csvFileColumns: Array[String],
+ requireColumns: Array[String],
+ allDictionaryPath: String,
+ accumulator: Accumulator[Int]) = {
var allDictionaryRdd: RDD[(String, Iterable[String])] = null
try {
// read local dictionary file, and spilt (columnIndex, columnValue)
@@ -686,7 +687,7 @@ object GlobalDictionaryUtil extends Logging {
true
} else {
logWarning("No dictionary files found or empty dictionary files! " +
- "Won't generate new dictionary.")
+ "Won't generate new dictionary.")
false
}
} else {
@@ -699,7 +700,7 @@ object GlobalDictionaryUtil extends Logging {
true
} else {
logWarning("No dictionary files found or empty dictionary files! " +
- "Won't generate new dictionary.")
+ "Won't generate new dictionary.")
false
}
} else {
@@ -726,7 +727,7 @@ object GlobalDictionaryUtil extends Logging {
} else {
carbonLoadModel.getCsvDelimiter
}
- headers = readLine.toLowerCase().split(delimiter);
+ headers = readLine.toLowerCase().split(delimiter)
} else {
logError("Not found file header! Please set fileheader")
throw new IOException("Failed to get file header")
@@ -737,13 +738,13 @@ object GlobalDictionaryUtil extends Logging {
/**
* generate global dictionary with SQLContext and CarbonLoadModel
*
- * @param sqlContext sql context
- * @param carbonLoadModel carbon load model
+ * @param sqlContext sql context
+ * @param carbonLoadModel carbon load model
*/
def generateGlobalDictionary(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- dataFrame: Option[DataFrame] = None): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ dataFrame: Option[DataFrame] = None): Unit = {
try {
var carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
@@ -757,7 +758,7 @@ object GlobalDictionaryUtil extends Logging {
carbonLoadModel.initPredefDictMap()
val allDictionaryPath = carbonLoadModel.getAllDictPath
- if(StringUtils.isEmpty(allDictionaryPath)) {
+ if (StringUtils.isEmpty(allDictionaryPath)) {
logInfo("Generate global dictionary from source data files!")
// load data by using dataSource com.databricks.spark.csv
var df = if (dataFrame.isDefined) {
@@ -767,8 +768,7 @@ object GlobalDictionaryUtil extends Logging {
}
var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
df.columns
- }
- else {
+ } else {
carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
}
headers = headers.map(headerName => headerName.trim)
@@ -779,8 +779,9 @@ object GlobalDictionaryUtil extends Logging {
dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
}
if (headers.length > df.columns.length) {
- val msg = "The number of columns in the file header do not match the number of " +
- "columns in the data file; Either delimiter or fileheader provided is not correct"
+ val msg = "The number of columns in the file header do not match the " +
+ "number of columns in the data file; Either delimiter " +
+ "or fileheader provided is not correct"
logError(msg)
throw new DataLoadingException(msg)
}
@@ -829,7 +830,7 @@ object GlobalDictionaryUtil extends Logging {
} else {
logInfo("Generate global dictionary from dictionary files!")
val isNonempty = validateAllDictionaryPath(allDictionaryPath)
- if(isNonempty) {
+ if (isNonempty) {
var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
getHeaderFormFactFile(carbonLoadModel)
} else {
@@ -837,8 +838,7 @@ object GlobalDictionaryUtil extends Logging {
}
headers = headers.map(headerName => headerName.trim)
// prune columns according to the CSV file header, dimension columns
- val (requireDimension, requireColumnNames) =
- pruneDimensions(dimensions, headers, headers)
+ val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
if (requireDimension.nonEmpty) {
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
requireDimension, storePath, dictfolderPath, false)
@@ -857,7 +857,7 @@ object GlobalDictionaryUtil extends Logging {
// if the dictionary contains wrong format record, throw ex
if (accumulator.value > 0) {
throw new DataLoadingException("Data Loading failure, dictionary values are " +
- "not in correct format!")
+ "not in correct format!")
}
} else {
logInfo("have no column need to generate global dictionary")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index e27d166..a792eca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,14 +44,14 @@ class CarbonContext(
metaStorePath: String) extends HiveContext(sc) with Logging {
self =>
- def this (sc: SparkContext) = {
- this (sc,
+ def this(sc: SparkContext) = {
+ this(sc,
new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
}
- def this (sc: SparkContext, storePath: String) = {
- this (sc,
+ def this(sc: SparkContext, storePath: String) = {
+ this(sc,
storePath,
new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
}
@@ -107,9 +107,9 @@ class CarbonContext(
if (sc.hadoopConfiguration.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) {
val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
- logDebug(s"metastore db is going to be created in location : $hiveMetaStoreDB")
+ logDebug(s"metastore db is going to be created in location: $hiveMetaStoreDB")
super.configure() ++ Map((CarbonCommonConstants.HIVE_CONNECTION_URL,
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
+ s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
("hive.metastore.warehouse.dir", metaStorePathAbsolute + "/hivemetadata"))
} else {
super.configure()
@@ -186,7 +186,7 @@ object CarbonContext {
/**
*
* Requesting the extra executors other than the existing ones.
- *
+ *
* @param sc
* @param numExecutors
* @return
@@ -194,12 +194,12 @@ object CarbonContext {
final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- val requiredExecutors = numExecutors - b.numExistingExecutors
+ val requiredExecutors = numExecutors - b.numExistingExecutors
LOGGER
- .info("number of executors is =" + numExecutors + " existing executors are =" + b
- .numExistingExecutors
+ .info(s"number of executors is =$numExecutors existing executors are =" +
+ s"${ b.numExistingExecutors }"
)
- if(requiredExecutors > 0) {
+ if (requiredExecutors > 0) {
b.requestExecutors(requiredExecutors)
}
true
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c1a8818..f4fe900 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -111,10 +111,10 @@ case class CarbonDictionaryDecoder(
case DataType.TIMESTAMP => TimestampType
case DataType.STRUCT =>
CarbonMetastoreTypes
- .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+ .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
case DataType.ARRAY =>
CarbonMetastoreTypes
- .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
}
}
@@ -131,7 +131,7 @@ case class CarbonDictionaryDecoder(
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
- carbonDimension.getDataType)
+ carbonDimension.getDataType)
} else {
(null, null, null)
}
@@ -160,7 +160,7 @@ case class CarbonDictionaryDecoder(
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap
- val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId);
+ val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
if (isRequiredToDecode) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
@@ -186,7 +186,7 @@ case class CarbonDictionaryDecoder(
var total = 0L
override final def hasNext: Boolean = {
flag = iter.hasNext
- if (false == flag && total > 0) {
+ if (!flag && total > 0) {
val queryStatistic = new QueryStatistic()
queryStatistic
.addFixedTimeStatistic(QueryStatisticsConstants.PREPARE_RESULT, total)
@@ -202,8 +202,8 @@ case class CarbonDictionaryDecoder(
dictIndex.foreach { index =>
if (data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
- .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
- getDictionaryColumnIds(index)._3)
+ .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ getDictionaryColumnIds(index)._3)
}
}
val result = unsafeProjection(new GenericMutableRow(data))
@@ -231,7 +231,7 @@ case class CarbonDictionaryDecoder(
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
- atiMap.get(f._1).get.getCarbonTableIdentifier,
+ atiMap(f._1).getCarbonTableIdentifier,
f._2, f._3))
} catch {
case _: Throwable => null
@@ -242,4 +242,5 @@ case class CarbonDictionaryDecoder(
}
dicts
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 6f149f7..f9a0a9d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -171,39 +171,48 @@ class CarbonSqlParser()
}
import lexical.Identifier
- implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
- s"identifier matching regex ${regex}",
+
+ implicit def regexToParser(regex: Regex): Parser[String] = {
+ acceptMatch(
+ s"identifier matching regex ${ regex }",
{ case Identifier(str) if regex.unapplySeq(str).isDefined => str }
- )
- override def parse(input: String): LogicalPlan = synchronized {
- // Initialize the Keywords.
- initLexical
- phrase(start)(new lexical.Scanner(input)) match {
- case Success(plan, _) => plan match {
- case x: LoadTable =>
- x.inputSqlString = input
- x
- case logicalPlan => logicalPlan
+ )
+ }
+
+ override def parse(input: String): LogicalPlan = {
+ synchronized {
+ // Initialize the Keywords.
+ initLexical
+ phrase(start)(new lexical.Scanner(input)) match {
+ case Success(plan, _) => plan match {
+ case x: LoadTable =>
+ x.inputSqlString = input
+ x
+ case logicalPlan => logicalPlan
+ }
+ case failureOrError => sys.error(failureOrError.toString)
}
- case failureOrError => sys.error(failureOrError.toString)
}
}
/**
* This will convert key word to regular expression.
+ *
* @param keys
* @return
*/
- private def carbonKeyWord(keys: String) =
+ private def carbonKeyWord(keys: String) = {
("(?i)" + keys).r
+ }
override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
- loadManagement | describeTable | showLoads | alterTable | createTable
+ loadManagement | describeTable |
+ showLoads | alterTable | createTable
protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
- cleanFiles | loadDataNew
+ cleanFiles | loadDataNew
protected val escapedIdentifier = "`([^`]+)`".r
@@ -248,9 +257,9 @@ class CarbonSqlParser()
var dimensions: Seq[Field] = Seq()
dims.foreach { dimension =>
dimension.dataType.getOrElse("NIL") match {
- case "Array" => complexDimensions = complexDimensions:+dimension
- case "Struct" => complexDimensions = complexDimensions:+dimension
- case _ => dimensions = dimensions:+dimension
+ case "Array" => complexDimensions = complexDimensions :+ dimension
+ case "Struct" => complexDimensions = complexDimensions :+ dimension
+ case _ => dimensions = dimensions :+ dimension
}
}
dimensions ++ complexDimensions
@@ -276,22 +285,22 @@ class CarbonSqlParser()
* For handling the create table DDl systax compatible to Hive syntax
*/
protected lazy val createTable: Parser[LogicalPlan] =
- restInput ^^ {
+ restInput ^^ {
- case statement =>
- try {
- // DDl will be parsed and we get the AST tree from the HiveQl
- val node = HiveQlWrapper.getAst(statement)
- // processing the AST tree
- nodeToPlan(node)
- } catch {
- // MalformedCarbonCommandException need to be throw directly, parser will catch it
- case ce: MalformedCarbonCommandException =>
- throw ce
- case e: Exception =>
- sys.error("Parsing error") // no need to do anything.
- }
- }
+ case statement =>
+ try {
+ // DDl will be parsed and we get the AST tree from the HiveQl
+ val node = HiveQlWrapper.getAst(statement)
+ // processing the AST tree
+ nodeToPlan(node)
+ } catch {
+ // MalformedCarbonCommandException need to be throw directly, parser will catch it
+ case ce: MalformedCarbonCommandException =>
+ throw ce
+ case e: Exception =>
+ sys.error("Parsing error") // no need to do anything.
+ }
+ }
private def getScaleAndPrecision(dataType: String): (Int, Int) = {
val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
@@ -313,17 +322,17 @@ class CarbonSqlParser()
case Token("TOK_CREATETABLE", children) =>
- var fields: Seq[Field] = Seq[Field]()
- var tableComment: String = ""
- var tableProperties = Map[String, String]()
- var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
- var likeTableName: String = ""
- var storedBy: String = ""
- var ifNotExistPresent: Boolean = false
- var dbName: Option[String] = None
- var tableName: String = ""
+ var fields: Seq[Field] = Seq[Field]()
+ var tableComment: String = ""
+ var tableProperties = Map[String, String]()
+ var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+ var likeTableName: String = ""
+ var storedBy: String = ""
+ var ifNotExistPresent: Boolean = false
+ var dbName: Option[String] = None
+ var tableName: String = ""
- try {
+ try {
// Checking whether create table request is carbon table
children.collect {
@@ -332,7 +341,7 @@ class CarbonSqlParser()
case _ =>
}
if (!(storedBy.equals(CarbonContext.datasourceName) ||
- storedBy.equals(CarbonContext.datasourceShortName))) {
+ storedBy.equals(CarbonContext.datasourceShortName))) {
sys.error("Not a carbon format request")
}
@@ -362,7 +371,7 @@ class CarbonSqlParser()
match {
case Success(field, _) => field
case failureOrError => throw new MalformedCarbonCommandException(
- s"Unsupported data type : $col.getType")
+ s"Unsupported data type: $col.getType")
}
// the data type of the decimal type will be like decimal(10,0)
// so checking the start of the string and taking the precision and scale.
@@ -405,7 +414,7 @@ class CarbonSqlParser()
if (repeatedProperties.nonEmpty) {
val repeatedPropStr: String = repeatedProperties.mkString(",")
throw new MalformedCarbonCommandException("Table properties is repeated: " +
- repeatedPropStr)
+ repeatedPropStr)
}
tableProperties ++= propertySeq
@@ -430,12 +439,17 @@ class CarbonSqlParser()
// get logical plan.
CreateTable(tableModel)
- }
- catch {
+ } catch {
case ce: MalformedCarbonCommandException =>
- val message = if (tableName.isEmpty) "Create table command failed. "
- else if (dbName.isEmpty) s"Create table command failed for $tableName. "
- else s"Create table command failed for ${dbName.get}.$tableName. "
+ val message = if (tableName.isEmpty) {
+ "Create table command failed. "
+ }
+ else if (dbName.isEmpty) {
+ s"Create table command failed for $tableName. "
+ }
+ else {
+ s"Create table command failed for ${ dbName.get }.$tableName. "
+ }
LOGGER.audit(message + ce.getMessage)
throw ce
}
@@ -488,19 +502,24 @@ class CarbonSqlParser()
* @return
*/
protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
- , tableName: String, fields: Seq[Field],
- partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]): tableModel
+ , tableName: String, fields: Seq[Field],
+ partitionCols: Seq[PartitionerField],
+ tableProperties: Map[String, String]): tableModel
= {
val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
fields, tableProperties)
if (dims.isEmpty) {
- throw new MalformedCarbonCommandException(s"Table ${dbName.getOrElse(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName"
- + " can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
- "DICTIONARY_EXCLUDE to set at least one key column " +
- "if all specified columns are numeric types")
+ throw new MalformedCarbonCommandException(s"Table ${
+ dbName.getOrElse(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+ }.$tableName"
+ +
+ " can not be created without key columns. Please " +
+ "use DICTIONARY_INCLUDE or " +
+ "DICTIONARY_EXCLUDE to set at least one key " +
+ "column " +
+ "if all specified columns are numeric types")
}
val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
@@ -508,7 +527,7 @@ class CarbonSqlParser()
val colProps = extractColumnProperties(fields, tableProperties)
// get column groups configuration from table properties.
val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
- noDictionaryDims, msrs, dims)
+ noDictionaryDims, msrs, dims)
// get no inverted index columns from table properties.
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
@@ -554,8 +573,7 @@ class CarbonSqlParser()
}
// This will be furthur handled.
CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
- }
- else {
+ } else {
null
}
}
@@ -603,7 +621,7 @@ class CarbonSqlParser()
* @return
*/
protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]):
+ tableProperties: Map[String, String]):
Option[Partitioner] = {
// by default setting partition class empty.
@@ -649,8 +667,8 @@ class CarbonSqlParser()
}
protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
fieldChildren.foreach(fields => {
fields.foreach(field => {
fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
@@ -661,9 +679,9 @@ class CarbonSqlParser()
}
protected def fillColumnProperty(parentColumnName: Option[String],
- columnName: String,
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ columnName: String,
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
if (colProps.isDefined) {
@@ -672,7 +690,7 @@ class CarbonSqlParser()
}
def getKey(parentColumnName: Option[String],
- columnName: String): (String, String) = {
+ columnName: String): (String, String) = {
if (parentColumnName.isDefined) {
if (columnName == "val") {
(parentColumnName.get, parentColumnName.get + "." + columnName)
@@ -683,6 +701,7 @@ class CarbonSqlParser()
(columnName, columnName)
}
}
+
/**
* This will extract the no inverted columns fields.
* By default all dimensions use inverted index.
@@ -692,7 +711,7 @@ class CarbonSqlParser()
* @return
*/
protected def extractNoInvertedIndexColumns(fields: Seq[Field],
- tableProperties: Map[String, String]):
+ tableProperties: Map[String, String]):
Seq[String] = {
// check whether the column name is in fields
var noInvertedIdxColsProps: Array[String] = Array[String]()
@@ -703,17 +722,17 @@ class CarbonSqlParser()
tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
noInvertedIdxColsProps
.map { noInvertedIdxColProp =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
- val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
+ if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+ val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
}
- }
}
// check duplicate columns and only 1 col left
val distinctCols = noInvertedIdxColsProps.toSet
// extract the no inverted index columns
- fields.foreach( field => {
+ fields.foreach(field => {
if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
noInvertedIdxCols :+= field.column
}
@@ -731,7 +750,7 @@ class CarbonSqlParser()
* @return
*/
protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
- tableProperties: Map[String, String]):
+ tableProperties: Map[String, String]):
(Seq[Field], Seq[String]) = {
var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
var dictExcludeCols: Array[String] = Array[String]()
@@ -746,18 +765,18 @@ class CarbonSqlParser()
.map { dictExcludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
- " does not exist in table. Please check create table statement."
+ " does not exist in table. Please check create table statement."
throw new MalformedCarbonCommandException(errormsg)
} else {
- val dataType = fields.find (x =>
+ val dataType = fields.find(x =>
x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
if (isComplexDimDictionaryExclude(dataType)) {
val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
- dictExcludeCol
+ dictExcludeCol
throw new MalformedCarbonCommandException(errormsg)
} else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
- " data type column: " + dictExcludeCol
+ " data type column: " + dictExcludeCol
throw new MalformedCarbonCommandException(errorMsg)
}
}
@@ -768,19 +787,19 @@ class CarbonSqlParser()
dictIncludeCols =
tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
dictIncludeCols.map { distIncludeCol =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
- val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
+ if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+ val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
}
+ }
}
// include cols should contain exclude cols
dictExcludeCols.foreach { dicExcludeCol =>
if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
- " with DICTIONARY_INCLUDE. Please check create table statement."
+ " with DICTIONARY_INCLUDE. Please check create table statement."
throw new MalformedCarbonCommandException(errormsg)
}
}
@@ -794,11 +813,9 @@ class CarbonSqlParser()
noDictionaryDims :+= field.column
}
dimFields += field
- }
- else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
dimFields += (field)
- }
- else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+ } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
dimFields += (field)
}
}
@@ -806,11 +823,12 @@ class CarbonSqlParser()
(dimFields.toSeq, noDictionaryDims)
}
+
/**
* It fills non string dimensions in dimFields
*/
def fillNonStringDimension(dictIncludeCols: Seq[String],
- field: Field, dimFields: LinkedHashSet[Field]) {
+ field: Field, dimFields: LinkedHashSet[Field]) {
var dictInclude = false
if (dictIncludeCols.nonEmpty) {
dictIncludeCols.foreach(dictIncludeCol =>
@@ -841,9 +859,9 @@ class CarbonSqlParser()
dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
}
- /**
- * detects whether double or decimal column is part of dictionary_exclude
- */
+ /**
+ * detects whether double or decimal column is part of dictionary_exclude
+ */
def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
val dataTypes = Array("string", "timestamp")
dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
@@ -857,7 +875,7 @@ class CarbonSqlParser()
* @return
*/
protected def extractMsrColsFromFields(fields: Seq[Field],
- tableProperties: Map[String, String]): Seq[Field] = {
+ tableProperties: Map[String, String]): Seq[Field] = {
var msrFields: Seq[Field] = Seq[Field]()
var dictIncludedCols: Array[String] = Array[String]()
var dictExcludedCols: Array[String] = Array[String]()
@@ -877,10 +895,10 @@ class CarbonSqlParser()
// by default consider all non string cols as msrs. consider all include/ exclude cols as dims
fields.foreach(field => {
if (!isDetectAsDimentionDatatype(field.dataType.get)) {
- if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+ if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
!dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
- msrFields :+= field
- }
+ msrFields :+= field
+ }
}
})
@@ -905,35 +923,43 @@ class CarbonSqlParser()
(db, tableName)
}
- protected def cleanIdentifier(ident: String): String = ident match {
- case escapedIdentifier(i) => i
- case plainIdent => plainIdent
+ protected def cleanIdentifier(ident: String): String = {
+ ident match {
+ case escapedIdentifier(i) => i
+ case plainIdent => plainIdent
+ }
}
protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
var remainingNodes = nodeList
val clauses = clauseNames.map { clauseName =>
val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
- remainingNodes = nonMatches ++ (if (matches.nonEmpty) matches.tail else Nil)
+ remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
+ matches.tail
+ } else {
+ Nil
+ })
matches.headOption
}
if (remainingNodes.nonEmpty) {
sys.error(
s"""Unhandled clauses:
- |You are likely trying to use an unsupported carbon feature."""".stripMargin)
+ |You are likely trying to use an unsupported carbon feature."""".stripMargin)
}
clauses
}
object Token {
/** @return matches of the form (tokenName, children). */
- def unapply(t: Any): Option[(String, Seq[ASTNode])] = t match {
- case t: ASTNode =>
- CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
- Some((t.getText,
- Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
- case _ => None
+ def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
+ t match {
+ case t: ASTNode =>
+ CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
+ Some((t.getText,
+ Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+ case _ => None
+ }
}
}
@@ -943,35 +969,39 @@ class CarbonSqlParser()
* @param node
* @return
*/
- protected def getProperties(node: Node): Seq[(String, String)] = node match {
- case Token("TOK_TABLEPROPLIST", list) =>
- list.map {
- case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- (unquoteString(key) -> unquoteString(value))
- }
+ protected def getProperties(node: Node): Seq[(String, String)] = {
+ node match {
+ case Token("TOK_TABLEPROPLIST", list) =>
+ list.map {
+ case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+ (unquoteString(key) -> unquoteString(value))
+ }
+ }
}
- protected def unquoteString(str: String) = str match {
- case singleQuotedString(s) => s.toLowerCase()
- case doubleQuotedString(s) => s.toLowerCase()
- case other => other
+ protected def unquoteString(str: String) = {
+ str match {
+ case singleQuotedString(s) => s.toLowerCase()
+ case doubleQuotedString(s) => s.toLowerCase()
+ case other => other
+ }
}
protected lazy val loadDataNew: Parser[LogicalPlan] =
LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
- (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
- (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
- case filePath ~ isOverwrite ~ table ~ optionsList =>
- val (databaseNameOp, tableName) = table match {
- case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
- }
- if(optionsList.isDefined) {
- validateOptions(optionsList)
- }
- val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
- LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
- isOverwrite.isDefined)
- }
+ (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+ case filePath ~ isOverwrite ~ table ~ optionsList =>
+ val (databaseNameOp, tableName) = table match {
+ case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+ }
+ if (optionsList.isDefined) {
+ validateOptions(optionsList)
+ }
+ val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+ LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+ isOverwrite.isDefined)
+ }
private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
@@ -999,9 +1029,9 @@ class CarbonSqlParser()
// COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
- options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
+ options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
- " in options"
+ " in options"
throw new MalformedCarbonCommandException(errorMessage)
}
@@ -1052,20 +1082,21 @@ class CarbonSqlParser()
protected lazy val primitiveTypes =
STRING ^^^ "string" | INTEGER ^^^ "integer" | TIMESTAMP ^^^
- "timestamp" | NUMERIC ^^^ "numeric" | BIGINT ^^^ "bigint" |
- INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType
+ "timestamp" | NUMERIC ^^^ "numeric" |
+ BIGINT ^^^ "bigint" |
+ INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType
/**
* Matching the decimal(10,0) data type and returning the same.
*/
private lazy val decimalType =
- DECIMAL ~ ("(" ~> numericLit <~",") ~ (numericLit <~ ")") ^^ {
- case decimal ~ precision ~scale =>
- s"$decimal($precision, $scale)"
- }
+ DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
+ case decimal ~ precision ~ scale =>
+ s"$decimal($precision, $scale)"
+ }
protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
- primitiveFieldType
+ primitiveFieldType
protected lazy val anyFieldDef: Parser[Field] =
(ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
@@ -1095,8 +1126,8 @@ class CarbonSqlParser()
protected lazy val measureCol: Parser[Field] =
(ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" |
- BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
- (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
+ BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
+ (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
}
@@ -1112,11 +1143,11 @@ class CarbonSqlParser()
if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) {
new DescribeFormattedCommand("describe formatted " + tblIdentifier,
tblIdentifier)
- }
- else {
+ } else {
new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined)
}
}
+
private def normalizeType(field: Field): Field = {
val dataType = field.dataType.getOrElse("NIL")
dataType match {
@@ -1217,22 +1248,23 @@ class CarbonSqlParser()
protected lazy val showLoads: Parser[LogicalPlan] =
SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
- (LIMIT ~> numericLit).? <~
- opt(";") ^^ {
+ (LIMIT ~> numericLit).? <~
+ opt(";") ^^ {
case databaseName ~ tableName ~ limit =>
ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
}
protected lazy val segmentId: Parser[String] =
numericLit ^^ { u => u } |
- elem("decimal", p => {
- p.getClass.getSimpleName.equals("FloatLit") ||
- p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
+ elem("decimal", p => {
+ p.getClass.getSimpleName.equals("FloatLit") ||
+ p.getClass.getSimpleName.equals("DecimalLit")
+ }) ^^ (_.chars)
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
- (ident <~ ".").? ~ ident) <~
- opt(";") ^^ {
+ (ident <~ ".").? ~ ident) <~
+ opt(";") ^^ {
case loadids ~ table => table match {
case databaseName ~ tableName =>
DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
@@ -1241,8 +1273,8 @@ class CarbonSqlParser()
protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
- (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
- opt(";") ^^ {
+ (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+ opt(";") ^^ {
case schema ~ table ~ condition =>
condition match {
case dateField ~ dateValue =>
@@ -1261,6 +1293,7 @@ class CarbonSqlParser()
logicalPlan match {
case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
case _ => ExplainCommand(OneRowRelation)
- }
+ }
}
+
}
[4/4] incubator-carbondata git commit: [CARBONDATA-328] [Spark]
Improve Code and Fix Warnings This closes #279
Posted by ja...@apache.org.
[CARBONDATA-328] [Spark] Improve Code and Fix Warnings This closes #279
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0a8e782f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0a8e782f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0a8e782f
Branch: refs/heads/master
Commit: 0a8e782ffd6c7aa958e291cd4f403859a8dd4104
Parents: c5176f3 6391c2b
Author: jackylk <ja...@huawei.com>
Authored: Sat Nov 19 10:16:23 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sat Nov 19 10:16:23 2016 +0800
----------------------------------------------------------------------
.../examples/AllDictionaryExample.scala | 2 +
.../carbondata/examples/CarbonExample.scala | 2 +
.../spark/sql/common/util/QueryTest.scala | 6 +-
.../spark/CarbonDataFrameWriter.scala | 4 +-
.../apache/carbondata/spark/CarbonFilters.scala | 34 +-
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDataLoadRDD.scala | 267 ++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 619 +++++++++----------
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 2 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 101 +--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 91 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 57 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 71 +--
.../spark/tasks/DictionaryWriterTask.scala | 10 +-
.../spark/thriftserver/CarbonThriftServer.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 14 +-
.../spark/util/DataTypeConverterUtil.scala | 2 +-
.../spark/util/GlobalDictionaryUtil.scala | 212 +++----
.../org/apache/spark/sql/CarbonContext.scala | 22 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 17 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 347 ++++++-----
.../spark/sql/SparkUnknownExpression.scala | 53 +-
.../execution/command/carbonTableSchema.scala | 270 ++++----
.../spark/sql/hive/CarbonMetastoreCatalog.scala | 65 +-
.../spark/sql/hive/DistributionUtil.scala | 35 +-
.../spark/sql/optimizer/CarbonOptimizer.scala | 13 +-
.../scala/org/apache/spark/util/FileUtils.scala | 10 +-
28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-carbondata git commit: Improved spark module code. *
Removed some compliation warnings. * Replace pattern matching for boolean to
IF-ELSE. * Improved code according to scala standards. * Removed unnecessary
new lines. * Added string inter
Posted by ja...@apache.org.
Improved spark module code.
* Removed some compliation warnings.
* Replace pattern matching for boolean to IF-ELSE.
* Improved code according to scala standards.
* Removed unnecessary new lines.
* Added string interpolation instead of string concatenation.
* Removed unnecessary semi-colons.
* Fixed indentation.
* add useKettle option for loading
* Fixed indentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6391c2be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6391c2be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6391c2be
Branch: refs/heads/master
Commit: 6391c2be31f347688a3dbe9f9657e3dd75158684
Parents: c5176f3
Author: Prabhat Kashyap <pr...@knoldus.in>
Authored: Wed Oct 19 22:24:47 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Sat Nov 19 09:51:04 2016 +0800
----------------------------------------------------------------------
.../examples/AllDictionaryExample.scala | 2 +
.../carbondata/examples/CarbonExample.scala | 2 +
.../spark/sql/common/util/QueryTest.scala | 6 +-
.../spark/CarbonDataFrameWriter.scala | 4 +-
.../apache/carbondata/spark/CarbonFilters.scala | 34 +-
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDataLoadRDD.scala | 267 ++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 619 +++++++++----------
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 2 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 101 +--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 91 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 57 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 71 +--
.../spark/tasks/DictionaryWriterTask.scala | 10 +-
.../spark/thriftserver/CarbonThriftServer.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 14 +-
.../spark/util/DataTypeConverterUtil.scala | 2 +-
.../spark/util/GlobalDictionaryUtil.scala | 212 +++----
.../org/apache/spark/sql/CarbonContext.scala | 22 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 17 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 347 ++++++-----
.../spark/sql/SparkUnknownExpression.scala | 53 +-
.../execution/command/carbonTableSchema.scala | 270 ++++----
.../spark/sql/hive/CarbonMetastoreCatalog.scala | 65 +-
.../spark/sql/hive/DistributionUtil.scala | 35 +-
.../spark/sql/optimizer/CarbonOptimizer.scala | 13 +-
.../scala/org/apache/spark/util/FileUtils.scala | 10 +-
28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
index dcdf41f..9fecadb 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
@@ -21,6 +21,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUtils}
object AllDictionaryExample {
+
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("CarbonExample")
val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -57,4 +58,5 @@ object AllDictionaryExample {
// clean local dictionary files
AllDictionaryUtil.cleanDictionary(allDictFile)
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 038f609..f98d46d 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.ExampleUtils
object CarbonExample {
+
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("CarbonExample")
val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -73,4 +74,5 @@ object CarbonExample {
// Drop table
cc.sql("DROP TABLE IF EXISTS t3")
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index f9960d3..587013f 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -140,7 +140,7 @@ object QueryTest {
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
""".stripMargin
- return Some(errorMessage)
+ Some(errorMessage)
}
if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
@@ -157,9 +157,9 @@ object QueryTest {
prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
}
""".stripMargin
- return Some(errorMessage)
+ Some(errorMessage)
}
- return None
+ None
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a02751e..3596393 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -126,8 +126,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
options.tableName,
null,
Seq(),
- Map(("fileheader" -> header)),
- false,
+ Map("fileheader" -> header),
+ isOverwriteExist = false,
null,
Some(dataFrame)).run(cc)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 711c51c..3162f80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -121,8 +121,8 @@ object CarbonFilters {
expr match {
case or@ Or(left, right) =>
- val leftFilter = translate(left, true)
- val rightFilter = translate(right, true)
+ val leftFilter = translate(left, or = true)
+ val rightFilter = translate(right, or = true)
if (leftFilter.isDefined && rightFilter.isDefined) {
Some( sources.Or(leftFilter.get, rightFilter.get))
} else {
@@ -265,29 +265,27 @@ object CarbonFilters {
Some(new EqualToExpression(transformExpression(child).get,
transformExpression(Literal(null)).get, true))
case Not(In(a: Attribute, list))
- if !list.exists(!_.isInstanceOf[Literal]) =>
- if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
- Some(new FalseExpression(transformExpression(a).get))
- }
- else {
- Some(new NotInExpression(transformExpression(a).get,
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
- }
+ }
case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(new InExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
case Not(In(Cast(a: Attribute, _), list))
if !list.exists(!_.isInstanceOf[Literal]) =>
- /* if any illogical expression comes in NOT IN Filter like
- NOT IN('scala',NULL) this will be treated as false expression and will
- always return no result. */
- if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
- Some(new FalseExpression(transformExpression(a).get))
- }
- else {
- Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+ /* if any illogical expression comes in NOT IN Filter like
+ NOT IN('scala',NULL) this will be treated as false expression and will
+ always return no result. */
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get, new ListExpression(
convertToJavaList(list.map(transformExpression(_).get)))))
- }
+ }
case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(new InExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 3ba32d2..3a5d952 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -76,7 +76,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 856e67c..2a36f30 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -26,7 +26,8 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.Random
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
@@ -78,11 +79,11 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
}
class SparkPartitionLoader(model: CarbonLoadModel,
- splitIndex: Int,
- storePath: String,
- kettleHomePath: String,
- loadCount: Int,
- loadMetadataDetails: LoadMetadataDetails) extends Logging{
+ splitIndex: Int,
+ storePath: String,
+ kettleHomePath: String,
+ loadCount: Int,
+ loadMetadataDetails: LoadMetadataDetails) extends Logging {
var storeLocation: String = ""
@@ -106,7 +107,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
- if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != storeLocations && storeLocations.nonEmpty) {
storeLocation = storeLocations(Random.nextInt(storeLocations.length))
@@ -114,8 +115,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
- }
- else {
+ } else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
@@ -127,7 +127,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
kettleHomePath)
} catch {
case e: DataLoadingException => if (e.getErrorCode ==
- DataProcessorConstants.BAD_REC_FOUND) {
+ DataProcessorConstants.BAD_REC_FOUND) {
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
logInfo("Bad Record Found")
} else {
@@ -160,6 +160,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
}
}
}
+
/**
* Use this RDD class to load csv data file
*
@@ -171,7 +172,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* @param partitioner Partitioner which specify how to partition
* @param columinar whether it is columinar
* @param loadCount Current load count
- * @param tableCreationTime Time of creating table
+ * @param tableCreationTime Time of creating table
* @param schemaLastUpdatedTime Time of last schema update
* @param blocksGroupBy Blocks Array which is group by partition or host
* @param isTableSplitPartition Whether using table split partition
@@ -195,30 +196,29 @@ class DataFileLoaderRDD[K, V](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- var splits = Array[TableSplit]()
- if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
- partitioner.nodeList, partitioner.partitionCount)
- }
- else {
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null, partitioner)
- }
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits = Array[TableSplit]()
+ if (carbonLoadModel.isDirectLoad) {
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+ partitioner.nodeList, partitioner.partitionCount)
+ } else {
+ splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null, partitioner)
+ }
- splits.zipWithIndex.map {s =>
- // filter the same partition unique id, because only one will match, so get 0 element
- val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
- p._1 == s._1.getPartition.getUniqueID)(0)._2
- new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
- }
- case false =>
- // for node partition
- blocksGroupBy.zipWithIndex.map{b =>
- new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
- }
+ splits.zipWithIndex.map { case (split, index) =>
+ // filter the same partition unique id, because only one will match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+ uniqueId == split.getPartition.getUniqueID
+ }(0)._2
+ new CarbonTableSplitPartition(id, index, split, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+ new CarbonNodePartition(id, index, uniqueId, blockDetails)
+ }
}
}
@@ -242,16 +242,14 @@ class DataFileLoaderRDD[K, V](
setModelAndBlocksInfo()
val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
+ loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
if (model.isRetentionRequest) {
recreateAggregationTableForRetention
- }
- else if (model.isAggLoadRequest) {
+ } else if (model.isAggLoadRequest) {
loadMetadataDetails.setLoadStatus(createManualAggregateTable)
- }
- else {
- loader.run
+ } else {
+ loader.run()
}
} catch {
case e: Exception =>
@@ -261,52 +259,50 @@ class DataFileLoaderRDD[K, V](
}
def setModelAndBlocksInfo(): Unit = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID)
- }
- partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
- // get this partition data blocks and put it to global static map
- GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
- StandardLogService.setThreadName(partitionID, null)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordPartitionBlockMap(
- partitionID, split.partitionBlocksDetail.length)
- case false =>
- // for node partition
- val split = theSplit.asInstanceOf[CarbonNodePartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordHostBlockMap(
- split.serializableHadoopSplit, split.nodeBlocksDetail.length)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- // set this node blocks info to global static map
- GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
- if (carbonLoadModel.isDirectLoad) {
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- }
- else {
- model = carbonLoadModel.getCopyWithPartition(partitionID)
- }
- StandardLogService.setThreadName(blocksID, null)
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+ // get this partition data blocks and put it to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+ StandardLogService.setThreadName(partitionID, null)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ } else {
+ // for node partition
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ // set this node blocks info to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+ if (carbonLoadModel.isDirectLoad) {
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(partitionID)
+ }
+ StandardLogService.setThreadName(blocksID, null)
}
}
@@ -316,14 +312,13 @@ class DataFileLoaderRDD[K, V](
* @return
*/
def gernerateBlocksID: String = {
- isTableSplitPartition match {
- case true =>
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
- .getPartition.getUniqueID + "_" + UUID.randomUUID()
- case false =>
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- UUID.randomUUID()
+ if (isTableSplitPartition) {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+ .getPartition.getUniqueID + "_" + UUID.randomUUID()
+ } else {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
}
}
@@ -351,8 +346,7 @@ class DataFileLoaderRDD[K, V](
CarbonLoaderUtil
.removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
logInfo(s"Aggregate table creation failed")
- }
- else {
+ } else {
logInfo("Aggregate tables creation successfull")
}
}
@@ -425,6 +419,7 @@ class DataFileLoaderRDD[K, V](
}
var finished = false
+
override def hasNext: Boolean = {
!finished
}
@@ -438,46 +433,46 @@ class DataFileLoaderRDD[K, V](
}
override def getPreferredLocations(split: Partition): Seq[String] = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
- val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
- location
- case false =>
- // for node partition
- val theSplit = split.asInstanceOf[CarbonNodePartition]
- val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Preferred Location for split : " + firstOptionLocation(0))
- val blockMap = new util.LinkedHashMap[String, Integer]()
- val tableBlocks = theSplit.blocksDetails
- tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
- location => {
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
+ if (isTableSplitPartition) {
+ // for table split partition
+ val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+ val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ location
+ } else {
+ // for node partition
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split: " + firstOptionLocation.head)
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach { tableBlock =>
+ tableBlock.getLocations.foreach { location =>
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
}
}
- )
- )
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
}
- )
+ }
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+ )
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
}
}
+
}
/**
* Use this RDD class to load RDD
+ *
* @param sc
* @param result
* @param carbonLoadModel
@@ -512,7 +507,7 @@ class DataFrameLoaderRDD[K, V](
var partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails()
var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
- theSplit.index
+ theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -521,14 +516,14 @@ class DataFrameLoaderRDD[K, V](
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
+ loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
val rddIteratorKey = UUID.randomUUID().toString
- try{
+ try {
RddInputUtils.put(rddIteratorKey,
new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
carbonLoadModel.setRddIteratorKey(rddIteratorKey)
- loader.run
+ loader.run()
} finally {
RddInputUtils.remove(rddIteratorKey)
}
@@ -540,6 +535,7 @@ class DataFrameLoaderRDD[K, V](
}
var finished = false
+
override def hasNext: Boolean = !finished
override def next(): (K, V) = {
@@ -556,11 +552,12 @@ class DataFrameLoaderRDD[K, V](
/**
* This class wrap Scala's Iterator to Java's Iterator.
* It also convert all columns to string data to use csv data loading flow.
+ *
* @param rddIter
* @param carbonLoadModel
*/
class RddIterator(rddIter: Iterator[Row],
- carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+ carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
@@ -570,9 +567,10 @@ class RddIterator(rddIter: Iterator[Row],
def hasNext: Boolean = rddIter.hasNext
private def getString(value: Any, level: Int = 1): String = {
- value == null match {
- case true => ""
- case false => value match {
+ if (value == null) {
+ ""
+ } else {
+ value match {
case s: String => s
case i: java.lang.Integer => i.toString
case d: java.lang.Double => d.toString
@@ -623,4 +621,5 @@ class RddIterator(rddIter: Iterator[Row],
def remove(): Unit = {
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4392efe..1382efa 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{util => _, _}
import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
@@ -44,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
+CompactionType}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.etl.DataLoadingException
@@ -56,6 +58,7 @@ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
+
/**
* This is the factory class which can create different RDD depends on user needs.
*
@@ -178,8 +181,12 @@ object CarbonDataRDDFactory extends Logging {
}
def configSplitMaxSize(context: SparkContext, filePaths: String,
- hadoopConfiguration: Configuration): Unit = {
- val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
+ hadoopConfiguration: Configuration): Unit = {
+ val defaultParallelism = if (context.defaultParallelism < 1) {
+ 1
+ } else {
+ context.defaultParallelism
+ }
val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
val blockSize =
hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
@@ -191,30 +198,26 @@ object CarbonDataRDDFactory extends Logging {
newSplitSize = CarbonCommonConstants.CARBON_16MB
}
hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
- logInfo("totalInputSpaceConsumed : " + spaceConsumed +
- " , defaultParallelism : " + defaultParallelism)
- logInfo("mapreduce.input.fileinputformat.split.maxsize : " + newSplitSize.toString)
+ logInfo(s"totalInputSpaceConsumed: $spaceConsumed , defaultParallelism: $defaultParallelism")
+ logInfo(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
}
}
def alterTableForCompaction(sqlContext: SQLContext,
- alterTableModel: AlterTableModel,
- carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
- kettleHomePath: String, storeLocation: String): Unit = {
+ alterTableModel: AlterTableModel,
+ carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
+ kettleHomePath: String, storeLocation: String): Unit = {
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
compactionType = CompactionType.MAJOR_COMPACTION
- }
- else {
+ } else {
compactionType = CompactionType.MINOR_COMPACTION
}
- logger
- .audit(s"Compaction request received for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
+ logger.audit(s"Compaction request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
@@ -244,9 +247,7 @@ object CarbonDataRDDFactory extends Logging {
// if any other request comes at this time then it will create a compaction request file.
// so that this will be taken up by the compaction process which is executing.
if (!isConcurrentCompactionAllowed) {
- logger
- .info("System level compaction lock is enabled."
- )
+ logger.info("System level compaction lock is enabled.")
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
partitioner,
@@ -257,8 +258,7 @@ object CarbonDataRDDFactory extends Logging {
carbonTable,
compactionModel
)
- }
- else {
+ } else {
// normal flow of compaction
val lock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
@@ -266,10 +266,8 @@ object CarbonDataRDDFactory extends Logging {
)
if (lock.lockWithRetries()) {
- logger
- .info("Acquired the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.info("Acquired the compaction lock for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -280,45 +278,37 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
}
- }
- else {
- logger
- .audit("Not able to acquire the compaction lock for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.error(s"Not able to acquire the compaction lock for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
sys.error("Table is already locked for compaction. Please try after some time.")
}
}
}
def handleCompactionForSystemLocking(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storePath: String,
- kettleHomePath: String,
- storeLocation: String,
- compactionType: CompactionType,
- carbonTable: CarbonTable,
- compactionModel: CompactionModel): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storePath: String,
+ kettleHomePath: String,
+ storeLocation: String,
+ compactionType: CompactionType,
+ carbonTable: CarbonTable,
+ compactionModel: CompactionModel): Unit = {
val lock = CarbonLockFactory
.getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
)
if (lock.lockWithRetries()) {
- logger
- .info("Acquired the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
+ s".${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -329,50 +319,43 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
// if the compaction is a blocking call then only need to throw the exception.
if (compactionModel.isDDLTrigger) {
throw e
}
}
- }
- else {
- logger
- .audit("Not able to acquire the system level compaction lock for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the system level compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.error("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
.createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
// do sys error only in case of DDL trigger.
- if(compactionModel.isDDLTrigger) {
- sys.error("Compaction is in progress, compaction request for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue.")
- }
- else {
- logger
- .error("Compaction is in progress, compaction request for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue."
- )
+ if (compactionModel.isDDLTrigger) {
+ sys.error("Compaction is in progress, compaction request for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
+ } else {
+ logger.error("Compaction is in progress, compaction request for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
}
}
}
def executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
- compactionModel: CompactionModel,
- partitioner: Partitioner,
- executor: ExecutorService,
- sqlContext: SQLContext,
- kettleHomePath: String,
- storeLocation: String): Unit = {
+ storePath: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ kettleHomePath: String,
+ storeLocation: String): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
)
@@ -413,10 +396,9 @@ object CarbonDataRDDFactory extends Logging {
future.get
}
)
- }
- catch {
+ } catch {
case e: Exception =>
- logger.error("Exception in compaction thread " + e.getMessage)
+ logger.error(s"Exception in compaction thread ${ e.getMessage }")
throw e
}
@@ -442,22 +424,23 @@ object CarbonDataRDDFactory extends Logging {
)
}
}
+
/**
* This will submit the loads to be merged into the executor.
*
* @param futureList
*/
def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
- loadsToMerge: util
- .List[LoadMetadataDetails],
- executor: ExecutorService,
- storePath: String,
- sqlContext: SQLContext,
- compactionModel: CompactionModel,
- kettleHomePath: String,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storeLocation: String): Unit = {
+ loadsToMerge: util
+ .List[LoadMetadataDetails],
+ executor: ExecutorService,
+ storePath: String,
+ sqlContext: SQLContext,
+ compactionModel: CompactionModel,
+ kettleHomePath: String,
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storeLocation: String): Unit = {
loadsToMerge.asScala.foreach(seg => {
logger.info("loads identified for merge is " + seg.getLoadName)
@@ -484,13 +467,13 @@ object CarbonDataRDDFactory extends Logging {
}
def startCompactionThreads(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storePath: String,
- kettleHomePath: String,
- storeLocation: String,
- compactionModel: CompactionModel,
- compactionLock: ICarbonLock): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storePath: String,
+ kettleHomePath: String,
+ storeLocation: String,
+ compactionModel: CompactionModel,
+ compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -499,138 +482,123 @@ object CarbonDataRDDFactory extends Logging {
// clean up of the stale segments.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- }
- catch {
+ } catch {
case e: Exception =>
- logger
- .error("Exception in compaction thread while clean up of stale segments " + e
- .getMessage
- )
+ logger.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
}
- val compactionThread = new Thread {
- override def run(): Unit = {
+ val compactionThread = new Thread {
+ override def run(): Unit = {
+ try {
+ // compaction status of the table which is triggered by the user.
+ var triggeredCompactionStatus = false
+ var exception: Exception = null
try {
- // compaction status of the table which is triggered by the user.
- var triggeredCompactionStatus = false
- var exception : Exception = null
- try {
- executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
- compactionModel: CompactionModel,
- partitioner: Partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
+ executeCompaction(carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
+ )
+ triggeredCompactionStatus = true
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in compaction thread ${ e.getMessage }")
+ exception = e
+ }
+ // continue in case of exception also, check for all the tables.
+ val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ ).equalsIgnoreCase("true")
+
+ if (!isConcurrentCompactionAllowed) {
+ logger.info("System level compaction lock is enabled.")
+ val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
+ var tableForCompaction = CarbonCompactionUtil
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .tablesMeta.toArray, skipCompactionTables.toList.asJava
)
- triggeredCompactionStatus = true
- }
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread " + e.getMessage)
- exception = e
- }
- // continue in case of exception also, check for all the tables.
- val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
-
- if (!isConcurrentCompactionAllowed) {
- logger.info("System level compaction lock is enabled.")
- val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
- var tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.toList.asJava
+ while (null != tableForCompaction) {
+ logger.info("Compaction request has been identified for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ val table: CarbonTable = tableForCompaction.carbonTable
+ val metadataPath = table.getMetaDataFilepath
+ val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+ val newCarbonLoadModel = new CarbonLoadModel()
+ prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+ val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
)
- while (null != tableForCompaction) {
- logger
- .info("Compaction request has been identified for table " + tableForCompaction
- .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
- .getTableName
- )
- val table: CarbonTable = tableForCompaction.carbonTable
- val metadataPath = table.getMetaDataFilepath
- val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
- val newCarbonLoadModel = new CarbonLoadModel()
- prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
-
- val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
- val newcompactionModel = CompactionModel(compactionSize,
- compactionType,
- table,
- tableCreationTime,
- compactionModel.isDDLTrigger
+
+ val compactionSize = CarbonDataMergerUtil
+ .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+ val newcompactionModel = CompactionModel(compactionSize,
+ compactionType,
+ table,
+ tableCreationTime,
+ compactionModel.isDDLTrigger
+ )
+ // proceed for compaction
+ try {
+ executeCompaction(newCarbonLoadModel,
+ newCarbonLoadModel.getStorePath,
+ newcompactionModel,
+ partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
)
- // proceed for compaction
- try {
- executeCompaction(newCarbonLoadModel,
- newCarbonLoadModel.getStorePath,
- newcompactionModel,
- partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
- )
+ } catch {
+ case e: Exception =>
+ logger.error("Exception in compaction thread for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ // not handling the exception. only logging as this is not the table triggered
+ // by user.
+ } finally {
+ // delete the compaction required file in case of failure or success also.
+ if (!CarbonCompactionUtil
+ .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+ // if the compaction request file is not been able to delete then
+ // add those tables details to the skip list so that it wont be considered next.
+ skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+ logger.error("Compaction request file can not be deleted for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
}
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread for table " + tableForCompaction
- .carbonTable.getDatabaseName + "." +
- tableForCompaction.carbonTableIdentifier
- .getTableName)
- // not handling the exception. only logging as this is not the table triggered
- // by user.
- }
- finally {
- // delete the compaction required file in case of failure or success also.
- if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath, compactionType)) {
- // if the compaction request file is not been able to delete then
- // add those tables details to the skip list so that it wont be considered next.
- skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
- logger
- .error("Compaction request file can not be deleted for table " +
- tableForCompaction
- .carbonTable.getDatabaseName + "." + tableForCompaction
- .carbonTableIdentifier
- .getTableName
- )
-
- }
- }
- // ********* check again for all the tables.
- tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
- }
- // giving the user his error for telling in the beeline if his triggered table
- // compaction is failed.
- if (!triggeredCompactionStatus) {
- throw new Exception("Exception in compaction " + exception.getMessage)
}
+ // ********* check again for all the tables.
+ tableForCompaction = CarbonCompactionUtil
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .tablesMeta.toArray, skipCompactionTables.asJava
+ )
+ }
+ // giving the user his error for telling in the beeline if his triggered table
+ // compaction is failed.
+ if (!triggeredCompactionStatus) {
+ throw new Exception("Exception in compaction " + exception.getMessage)
}
}
- finally {
- executor.shutdownNow()
- deletePartialLoadsInCompaction(carbonLoadModel)
- compactionLock.unlock()
- }
+ } finally {
+ executor.shutdownNow()
+ deletePartialLoadsInCompaction(carbonLoadModel)
+ compactionLock.unlock()
}
}
+ }
// calling the run method of a thread to make the call as blocking call.
// in the future we may make this as concurrent.
compactionThread.run()
}
def prepareCarbonLoadModel(storePath: String,
- table: CarbonTable,
- newCarbonLoadModel: CarbonLoadModel): Unit = {
+ table: CarbonTable,
+ newCarbonLoadModel: CarbonLoadModel): Unit = {
newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
newCarbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -651,13 +619,10 @@ object CarbonDataRDDFactory extends Logging {
// so deleting those folders.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- }
- catch {
+ } catch {
case e: Exception =>
- logger
- .error("Exception in compaction thread while clean up of stale segments " + e
- .getMessage
- )
+ logger.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
}
}
@@ -674,13 +639,11 @@ object CarbonDataRDDFactory extends Logging {
val isAgg = false
// for handling of the segment Merging.
def handleSegmentMerging(tableCreationTime: Long): Unit = {
- logger
- .info("compaction need status is " + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired())
+ logger.info(s"compaction need status is" +
+ s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
- logger
- .audit("Compaction request received for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit(s"Compaction request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
@@ -717,8 +680,7 @@ object CarbonDataRDDFactory extends Logging {
carbonTable,
compactionModel
)
- }
- else {
+ } else {
val lock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.COMPACTION_LOCK
@@ -736,37 +698,34 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
}
- }
- else {
- logger
- .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
+ logger.error("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
}
}
}
}
try {
- logger
- .audit("Data load request has been received for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit(s"Data load request has been received for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
if (!useKettle) {
- logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit("Data is loading with New Data Flow for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
@@ -801,13 +760,10 @@ object CarbonDataRDDFactory extends Logging {
// so deleting those folders.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
- }
- catch {
+ } catch {
case e: Exception =>
logger
- .error("Exception in data load while clean up of stale segments " + e
- .getMessage
- )
+ .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
}
// reading the start time of data load.
@@ -826,14 +782,14 @@ object CarbonDataRDDFactory extends Logging {
var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
var status: Array[(String, LoadMetadataDetails)] = null
- def loadDataFile(): Unit = { isTableSplitPartition match {
- case true =>
+ def loadDataFile(): Unit = {
+ if (isTableSplitPartition) {
/*
- * when data handle by table split partition
- * 1) get partition files, direct load or not will get the different files path
- * 2) get files blocks by using SplitUtils
- * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
- */
+ * when data handle by table split partition
+ * 1) get partition files, direct load or not will get the different files path
+ * 2) get files blocks by using SplitUtils
+ * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+ */
var splits = Array[TableSplit]()
if (carbonLoadModel.isDirectLoad) {
// get all table Splits, this part means files were divide to different partitions
@@ -865,7 +821,7 @@ object CarbonDataRDDFactory extends Logging {
val pathBuilder = new StringBuilder()
pathBuilder.append(carbonLoadModel.getFactFilePath)
if (!carbonLoadModel.getFactFilePath.endsWith("/")
- && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
+ && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
pathBuilder.append("/")
}
pathBuilder.append(split.getPartition.getUniqueID).append("/")
@@ -873,16 +829,15 @@ object CarbonDataRDDFactory extends Logging {
SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
}
}
-
- case false =>
+ } else {
/*
- * when data load handle by node partition
- * 1)clone the hadoop configuration,and set the file path to the configuration
- * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
- * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
- * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
- * which parititon by host
- */
+ * when data load handle by node partition
+ * 1)clone the hadoop configuration,and set the file path to the configuration
+ * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
+ * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
+ * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
+ * which parititon by host
+ */
val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
// FileUtils will skip file which is no csv, and return all file path which split by ','
val filePaths = carbonLoadModel.getFactFilePath
@@ -921,9 +876,11 @@ object CarbonDataRDDFactory extends Logging {
.nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
.toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
- logInfo("Total Time taken in block allocation : " + timeElapsed)
- logInfo("Total no of blocks : " + blockList.size
- + ", No.of Nodes : " + nodeBlockMapping.size
+ logInfo("Total Time taken in block allocation: " + timeElapsed)
+ logInfo(s"Total no of blocks: ${ blockList.length }, No.of Nodes: ${
+ nodeBlockMapping
+ .size
+ }"
)
var str = ""
nodeBlockMapping.foreach(entry => {
@@ -983,7 +940,7 @@ object CarbonDataRDDFactory extends Logging {
var rdd = dataFrame.get.rdd
var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
- rdd = rdd.coalesce(numPartitions, false)
+ rdd = rdd.coalesce(numPartitions, shuffle = false)
status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
@@ -1061,37 +1018,34 @@ object CarbonDataRDDFactory extends Logging {
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
logInfo("********clean up done**********")
logger.audit(s"Data load is failed for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
logWarning("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
- val metadataDetails = status(0)._2
- if (!isAgg) {
- val status = CarbonLoaderUtil
- .recordLoadMetadata(currentLoadCount,
- metadataDetails,
- carbonLoadModel,
- loadStatus,
- loadStartTime
- )
- if (!status) {
- val errorMessage = "Dataload failed due to failure in table status updation."
- logger.audit("Data load is failed for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
- logger.error("Dataload failed due to failure in table status updation.")
- throw new Exception(errorMessage)
- }
- } else if (!carbonLoadModel.isRetentionRequest) {
- // TODO : Handle it
- logInfo("********Database updated**********")
+ val metadataDetails = status(0)._2
+ if (!isAgg) {
+ val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
+ carbonLoadModel, loadStatus, loadStartTime)
+ if (!status) {
+ val errorMessage = "Dataload failed due to failure in table status updation."
+ logger.audit("Data load is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
+ logger.error("Dataload failed due to failure in table status updation.")
+ throw new Exception(errorMessage)
}
- logger.audit("Data load is successful for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+ } else if (!carbonLoadModel.isRetentionRequest) {
+ // TODO : Handle it
+ logInfo("********Database updated**********")
+ }
+ logger.audit("Data load is successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
- }
- catch {
+ } catch {
case e: Exception =>
throw new Exception(
"Dataload is success. Auto-Compaction has failed. Please check logs.")
@@ -1111,10 +1065,10 @@ object CarbonDataRDDFactory extends Logging {
}
def deleteLoadsAndUpdateMetadata(
- carbonLoadModel: CarbonLoadModel,
- table: CarbonTable, partitioner: Partitioner,
- storePath: String,
- isForceDeletion: Boolean) {
+ carbonLoadModel: CarbonLoadModel,
+ table: CarbonTable, partitioner: Partitioner,
+ storePath: String,
+ isForceDeletion: Boolean) {
if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
val loadMetadataFilePath = CarbonLoaderUtil
.extractLoadMetadataFileLocation(carbonLoadModel)
@@ -1132,36 +1086,34 @@ object CarbonDataRDDFactory extends Logging {
if (isUpdationRequired) {
try {
- // Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
- logger.info("Table status lock has been successfully acquired.")
+ // Update load metadate file after cleaning deleted nodes
+ if (carbonTableStatusLock.lockWithRetries()) {
+ logger.info("Table status lock has been successfully acquired.")
- // read latest table status again.
- val latestMetadata = segmentStatusManager
- .readLoadMetadata(loadMetadataFilePath)
+ // read latest table status again.
+ val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
- // update the metadata details from old to new status.
+ // update the metadata details from old to new status.
+ val latestStatus = CarbonLoaderUtil
+ .updateLoadMetadataFromOldToNew(details, latestMetadata)
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(
- carbonLoadModel.getCarbonDataLoadSchema,
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, latestStatus
- )
- }
- else {
- val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
- "." + carbonLoadModel.getTableName +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- logger.audit(errorMsg)
- logger.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
+ CarbonLoaderUtil.writeLoadMetadata(
+ carbonLoadModel.getCarbonDataLoadSchema,
+ carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, latestStatus
+ )
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }" +
+ ". Not able to acquire the table status lock due to other operation " +
+ "running in the background."
+ logger.audit(errorMsg)
+ logger.error(errorMsg)
+ throw new Exception(errorMsg + " Please try after some time.")
- }
- } finally {
+ }
+ } finally {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
}
}
@@ -1197,10 +1149,9 @@ object CarbonDataRDDFactory extends Logging {
partitioner,
storePath,
isForceDeletion = true)
- }
- else {
- val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
- "." + carbonLoadModel.getTableName +
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
logger.audit(errorMsg)
@@ -1208,10 +1159,10 @@ object CarbonDataRDDFactory extends Logging {
throw new Exception(errorMsg + " Please try after some time.")
}
- }
- finally {
+ } finally {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 8c52249..17b487c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -86,7 +86,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index df40ed7..57bf124 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -77,7 +77,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b7da579..bce4eb2 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifie
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -67,6 +67,7 @@ trait GenericParser {
case class DictionaryStats(distinctValues: java.util.List[String],
dictWriteTime: Long, sortIndexWriteTime: Long)
+
case class PrimitiveParser(dimension: CarbonDimension,
setOpt: Option[HashSet[String]]) extends GenericParser {
val (hasDictEncoding, set: HashSet[String]) = setOpt match {
@@ -164,20 +165,21 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
* A RDD to combine all dictionary distinct values.
*
* @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev the input RDD[(String, Iterable[String])]
+ * @param prev the input RDD[(String, Iterable[String])]
* @param model a model package load info
*/
class CarbonAllDictionaryCombineRDD(
- prev: RDD[(String, Iterable[String])],
- model: DictionaryLoadModel)
+ prev: RDD[(String, Iterable[String])],
+ model: DictionaryLoadModel)
extends RDD[(Int, ColumnDistinctValues)](prev) with Logging {
- override def getPartitions: Array[Partition] =
+ override def getPartitions: Array[Partition] = {
firstParent[(String, Iterable[String])].partitions
+ }
override def compute(split: Partition, context: TaskContext
- ): Iterator[(Int, ColumnDistinctValues)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass().getName())
+ ): Iterator[(Int, ColumnDistinctValues)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
/*
@@ -240,7 +242,7 @@ class CarbonBlockDistinctValuesCombineRDD(
override def compute(split: Partition,
context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
var rowCount = 0L
try {
@@ -259,7 +261,7 @@ class CarbonBlockDistinctValuesCombineRDD(
}
}
}
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
} catch {
case ex: Exception =>
LOGGER.error(ex)
@@ -288,7 +290,7 @@ class CarbonGlobalDictionaryGenerateRDD(
override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass().getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
var isHighCardinalityColumn = false
val iter = new Iterator[(Int, String, Boolean)] {
@@ -303,11 +305,11 @@ class CarbonGlobalDictionaryGenerateRDD(
model.hdfsTempLocation)
}
if (StringUtils.isNotBlank(model.lockType)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
- model.lockType)
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+ model.lockType)
}
if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
model.zooKeeperUrl)
}
val dictLock = CarbonLockFactory
@@ -320,7 +322,7 @@ class CarbonGlobalDictionaryGenerateRDD(
val valuesBuffer = new mutable.HashSet[String]
val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
var rowCount = 0L
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
breakable {
while (rddIter.hasNext) {
val distinctValueList = rddIter.next()._2
@@ -329,7 +331,7 @@ class CarbonGlobalDictionaryGenerateRDD(
// check high cardinality
if (model.isFirstLoad && model.highCardIdentifyEnable
&& !model.isComplexes(split.index)
- && model.dimensions(split.index).isColumnar()) {
+ && model.dimensions(split.index).isColumnar) {
isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
valuesBuffer.size, rowCount, model)
if (isHighCardinalityColumn) {
@@ -338,10 +340,13 @@ class CarbonGlobalDictionaryGenerateRDD(
}
}
}
- val combineListTime = (System.currentTimeMillis() - t1)
+ val combineListTime = System.currentTimeMillis() - t1
if (isHighCardinalityColumn) {
- LOGGER.info("column " + model.table.getTableUniqueName + "." +
- model.primDimensions(split.index).getColName + " is high cardinality column")
+ LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+ s"${
+ model.primDimensions(split.index)
+ .getColName
+ } is high cardinality column")
} else {
isDictionaryLocked = dictLock.lockWithRetries()
if (isDictionaryLocked) {
@@ -367,7 +372,7 @@ class CarbonGlobalDictionaryGenerateRDD(
} else {
null
}
- val dictCacheTime = (System.currentTimeMillis - t2)
+ val dictCacheTime = System.currentTimeMillis - t2
val t3 = System.currentTimeMillis()
val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
dictionaryForDistinctValueLookUp,
@@ -375,7 +380,7 @@ class CarbonGlobalDictionaryGenerateRDD(
split.index)
// execute dictionary writer task to get distinct values
val distinctValues = dictWriteTask.execute()
- val dictWriteTime = (System.currentTimeMillis() - t3)
+ val dictWriteTime = System.currentTimeMillis() - t3
val t4 = System.currentTimeMillis()
// if new data came than rewrite sort index file
if (distinctValues.size() > 0) {
@@ -385,22 +390,21 @@ class CarbonGlobalDictionaryGenerateRDD(
distinctValues)
sortIndexWriteTask.execute()
}
- val sortIndexWriteTime = (System.currentTimeMillis() - t4)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+ val sortIndexWriteTime = System.currentTimeMillis() - t4
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
// After sortIndex writing, update dictionaryMeta
dictWriteTask.updateMetaData()
// clear the value buffer after writing dictionary data
valuesBuffer.clear
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
dictionaryForDistinctValueLookUpCleared = true
- LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
- "\n columnId:" + model.primDimensions(split.index).getColumnId +
- "\n new distinct values count:" + distinctValues.size() +
- "\n combine lists:" + combineListTime +
- "\n create dictionary cache:" + dictCacheTime +
- "\n sort list, distinct and write:" + dictWriteTime +
- "\n write sort info:" + sortIndexWriteTime)
+ LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+ s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+ s"\n new distinct values count: ${ distinctValues.size() }" +
+ s"\n combine lists: $combineListTime" +
+ s"\n create dictionary cache: $dictCacheTime" +
+ s"\n sort list, distinct and write: $dictWriteTime" +
+ s"\n write sort info: $sortIndexWriteTime")
}
} catch {
case ex: Exception =>
@@ -408,11 +412,9 @@ class CarbonGlobalDictionaryGenerateRDD(
throw ex
} finally {
if (!dictionaryForDistinctValueLookUpCleared) {
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
}
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForSortIndexWriting);
+ CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
if (dictLock != null && isDictionaryLocked) {
if (dictLock.unlock()) {
logInfo(s"Dictionary ${
@@ -441,14 +443,17 @@ class CarbonGlobalDictionaryGenerateRDD(
(split.index, status, isHighCardinalityColumn)
}
}
+
iter
}
+
}
+
/**
* Set column dictionry patition format
*
- * @param id partition id
- * @param dimension current carbon dimension
+ * @param id partition id
+ * @param dimension current carbon dimension
*/
class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
extends Partition {
@@ -460,13 +465,13 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
/**
* Use external column dict to generate global dictionary
*
- * @param carbonLoadModel carbon load model
- * @param sparkContext spark context
- * @param table carbon table identifier
- * @param dimensions carbon dimenisons having predefined dict
- * @param hdfsLocation carbon base store path
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext spark context
+ * @param table carbon table identifier
+ * @param dimensions carbon dimenisons having predefined dict
+ * @param hdfsLocation carbon base store path
* @param dictFolderPath path of dictionary folder
-*/
+ */
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
dictionaryLoadModel: DictionaryLoadModel,
sparkContext: SparkContext,
@@ -505,25 +510,25 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
} catch {
case ex: Exception =>
logError(s"Error in reading pre-defined " +
- s"dictionary file:${ex.getMessage}")
+ s"dictionary file:${ ex.getMessage }")
throw ex
} finally {
if (csvReader != null) {
try {
- csvReader.close
+ csvReader.close()
} catch {
case ex: Exception =>
logError(s"Error in closing csvReader of " +
- s"pre-defined dictionary file:${ex.getMessage}")
+ s"pre-defined dictionary file:${ ex.getMessage }")
}
}
if (inputStream != null) {
try {
- inputStream.close
+ inputStream.close()
} catch {
case ex: Exception =>
logError(s"Error in closing inputStream of " +
- s"pre-defined dictionary file:${ex.getMessage}")
+ s"pre-defined dictionary file:${ ex.getMessage }")
}
}
}