You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/19 02:16:52 UTC
[3/4] incubator-carbondata git commit: Improved spark module code. *
Removed some compliation warnings. * Replace pattern matching for boolean to
IF-ELSE. * Improved code according to scala standards. * Removed unnecessary
new lines. * Added string inter
Improved spark module code.
* Removed some compliation warnings.
* Replace pattern matching for boolean to IF-ELSE.
* Improved code according to scala standards.
* Removed unnecessary new lines.
* Added string interpolation instead of string concatenation.
* Removed unnecessary semi-colons.
* Fixed indentation.
* add useKettle option for loading
* Fixed indentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6391c2be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6391c2be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6391c2be
Branch: refs/heads/master
Commit: 6391c2be31f347688a3dbe9f9657e3dd75158684
Parents: c5176f3
Author: Prabhat Kashyap <pr...@knoldus.in>
Authored: Wed Oct 19 22:24:47 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Sat Nov 19 09:51:04 2016 +0800
----------------------------------------------------------------------
.../examples/AllDictionaryExample.scala | 2 +
.../carbondata/examples/CarbonExample.scala | 2 +
.../spark/sql/common/util/QueryTest.scala | 6 +-
.../spark/CarbonDataFrameWriter.scala | 4 +-
.../apache/carbondata/spark/CarbonFilters.scala | 34 +-
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDataLoadRDD.scala | 267 ++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 619 +++++++++----------
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 2 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 101 +--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 91 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 57 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 71 +--
.../spark/tasks/DictionaryWriterTask.scala | 10 +-
.../spark/thriftserver/CarbonThriftServer.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 14 +-
.../spark/util/DataTypeConverterUtil.scala | 2 +-
.../spark/util/GlobalDictionaryUtil.scala | 212 +++----
.../org/apache/spark/sql/CarbonContext.scala | 22 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 17 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 347 ++++++-----
.../spark/sql/SparkUnknownExpression.scala | 53 +-
.../execution/command/carbonTableSchema.scala | 270 ++++----
.../spark/sql/hive/CarbonMetastoreCatalog.scala | 65 +-
.../spark/sql/hive/DistributionUtil.scala | 35 +-
.../spark/sql/optimizer/CarbonOptimizer.scala | 13 +-
.../scala/org/apache/spark/util/FileUtils.scala | 10 +-
28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
index dcdf41f..9fecadb 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
@@ -21,6 +21,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUtils}
object AllDictionaryExample {
+
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("CarbonExample")
val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -57,4 +58,5 @@ object AllDictionaryExample {
// clean local dictionary files
AllDictionaryUtil.cleanDictionary(allDictFile)
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 038f609..f98d46d 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.ExampleUtils
object CarbonExample {
+
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("CarbonExample")
val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -73,4 +74,5 @@ object CarbonExample {
// Drop table
cc.sql("DROP TABLE IF EXISTS t3")
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index f9960d3..587013f 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -140,7 +140,7 @@ object QueryTest {
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
""".stripMargin
- return Some(errorMessage)
+ Some(errorMessage)
}
if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
@@ -157,9 +157,9 @@ object QueryTest {
prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
}
""".stripMargin
- return Some(errorMessage)
+ Some(errorMessage)
}
- return None
+ None
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a02751e..3596393 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -126,8 +126,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
options.tableName,
null,
Seq(),
- Map(("fileheader" -> header)),
- false,
+ Map("fileheader" -> header),
+ isOverwriteExist = false,
null,
Some(dataFrame)).run(cc)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 711c51c..3162f80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -121,8 +121,8 @@ object CarbonFilters {
expr match {
case or@ Or(left, right) =>
- val leftFilter = translate(left, true)
- val rightFilter = translate(right, true)
+ val leftFilter = translate(left, or = true)
+ val rightFilter = translate(right, or = true)
if (leftFilter.isDefined && rightFilter.isDefined) {
Some( sources.Or(leftFilter.get, rightFilter.get))
} else {
@@ -265,29 +265,27 @@ object CarbonFilters {
Some(new EqualToExpression(transformExpression(child).get,
transformExpression(Literal(null)).get, true))
case Not(In(a: Attribute, list))
- if !list.exists(!_.isInstanceOf[Literal]) =>
- if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
- Some(new FalseExpression(transformExpression(a).get))
- }
- else {
- Some(new NotInExpression(transformExpression(a).get,
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
- }
+ }
case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(new InExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
case Not(In(Cast(a: Attribute, _), list))
if !list.exists(!_.isInstanceOf[Literal]) =>
- /* if any illogical expression comes in NOT IN Filter like
- NOT IN('scala',NULL) this will be treated as false expression and will
- always return no result. */
- if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
- Some(new FalseExpression(transformExpression(a).get))
- }
- else {
- Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+ /* if any illogical expression comes in NOT IN Filter like
+ NOT IN('scala',NULL) this will be treated as false expression and will
+ always return no result. */
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get, new ListExpression(
convertToJavaList(list.map(transformExpression(_).get)))))
- }
+ }
case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(new InExpression(transformExpression(a).get,
new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 3ba32d2..3a5d952 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -76,7 +76,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 856e67c..2a36f30 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -26,7 +26,8 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.Random
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
@@ -78,11 +79,11 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
}
class SparkPartitionLoader(model: CarbonLoadModel,
- splitIndex: Int,
- storePath: String,
- kettleHomePath: String,
- loadCount: Int,
- loadMetadataDetails: LoadMetadataDetails) extends Logging{
+ splitIndex: Int,
+ storePath: String,
+ kettleHomePath: String,
+ loadCount: Int,
+ loadMetadataDetails: LoadMetadataDetails) extends Logging {
var storeLocation: String = ""
@@ -106,7 +107,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
- if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != storeLocations && storeLocations.nonEmpty) {
storeLocation = storeLocations(Random.nextInt(storeLocations.length))
@@ -114,8 +115,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
- }
- else {
+ } else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
@@ -127,7 +127,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
kettleHomePath)
} catch {
case e: DataLoadingException => if (e.getErrorCode ==
- DataProcessorConstants.BAD_REC_FOUND) {
+ DataProcessorConstants.BAD_REC_FOUND) {
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
logInfo("Bad Record Found")
} else {
@@ -160,6 +160,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
}
}
}
+
/**
* Use this RDD class to load csv data file
*
@@ -171,7 +172,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* @param partitioner Partitioner which specify how to partition
* @param columinar whether it is columinar
* @param loadCount Current load count
- * @param tableCreationTime Time of creating table
+ * @param tableCreationTime Time of creating table
* @param schemaLastUpdatedTime Time of last schema update
* @param blocksGroupBy Blocks Array which is group by partition or host
* @param isTableSplitPartition Whether using table split partition
@@ -195,30 +196,29 @@ class DataFileLoaderRDD[K, V](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- var splits = Array[TableSplit]()
- if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
- partitioner.nodeList, partitioner.partitionCount)
- }
- else {
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null, partitioner)
- }
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits = Array[TableSplit]()
+ if (carbonLoadModel.isDirectLoad) {
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+ partitioner.nodeList, partitioner.partitionCount)
+ } else {
+ splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null, partitioner)
+ }
- splits.zipWithIndex.map {s =>
- // filter the same partition unique id, because only one will match, so get 0 element
- val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
- p._1 == s._1.getPartition.getUniqueID)(0)._2
- new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
- }
- case false =>
- // for node partition
- blocksGroupBy.zipWithIndex.map{b =>
- new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
- }
+ splits.zipWithIndex.map { case (split, index) =>
+ // filter the same partition unique id, because only one will match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+ uniqueId == split.getPartition.getUniqueID
+ }(0)._2
+ new CarbonTableSplitPartition(id, index, split, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+ new CarbonNodePartition(id, index, uniqueId, blockDetails)
+ }
}
}
@@ -242,16 +242,14 @@ class DataFileLoaderRDD[K, V](
setModelAndBlocksInfo()
val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
+ loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
if (model.isRetentionRequest) {
recreateAggregationTableForRetention
- }
- else if (model.isAggLoadRequest) {
+ } else if (model.isAggLoadRequest) {
loadMetadataDetails.setLoadStatus(createManualAggregateTable)
- }
- else {
- loader.run
+ } else {
+ loader.run()
}
} catch {
case e: Exception =>
@@ -261,52 +259,50 @@ class DataFileLoaderRDD[K, V](
}
def setModelAndBlocksInfo(): Unit = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID)
- }
- partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
- // get this partition data blocks and put it to global static map
- GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
- StandardLogService.setThreadName(partitionID, null)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordPartitionBlockMap(
- partitionID, split.partitionBlocksDetail.length)
- case false =>
- // for node partition
- val split = theSplit.asInstanceOf[CarbonNodePartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordHostBlockMap(
- split.serializableHadoopSplit, split.nodeBlocksDetail.length)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setBlocksID(blocksID)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- // set this node blocks info to global static map
- GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
- if (carbonLoadModel.isDirectLoad) {
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- }
- else {
- model = carbonLoadModel.getCopyWithPartition(partitionID)
- }
- StandardLogService.setThreadName(blocksID, null)
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+ // get this partition data blocks and put it to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+ StandardLogService.setThreadName(partitionID, null)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ } else {
+ // for node partition
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ // set this node blocks info to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+ if (carbonLoadModel.isDirectLoad) {
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(partitionID)
+ }
+ StandardLogService.setThreadName(blocksID, null)
}
}
@@ -316,14 +312,13 @@ class DataFileLoaderRDD[K, V](
* @return
*/
def gernerateBlocksID: String = {
- isTableSplitPartition match {
- case true =>
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
- .getPartition.getUniqueID + "_" + UUID.randomUUID()
- case false =>
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- UUID.randomUUID()
+ if (isTableSplitPartition) {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+ .getPartition.getUniqueID + "_" + UUID.randomUUID()
+ } else {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
}
}
@@ -351,8 +346,7 @@ class DataFileLoaderRDD[K, V](
CarbonLoaderUtil
.removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
logInfo(s"Aggregate table creation failed")
- }
- else {
+ } else {
logInfo("Aggregate tables creation successfull")
}
}
@@ -425,6 +419,7 @@ class DataFileLoaderRDD[K, V](
}
var finished = false
+
override def hasNext: Boolean = {
!finished
}
@@ -438,46 +433,46 @@ class DataFileLoaderRDD[K, V](
}
override def getPreferredLocations(split: Partition): Seq[String] = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
- val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
- location
- case false =>
- // for node partition
- val theSplit = split.asInstanceOf[CarbonNodePartition]
- val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Preferred Location for split : " + firstOptionLocation(0))
- val blockMap = new util.LinkedHashMap[String, Integer]()
- val tableBlocks = theSplit.blocksDetails
- tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
- location => {
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
+ if (isTableSplitPartition) {
+ // for table split partition
+ val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+ val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ location
+ } else {
+ // for node partition
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split: " + firstOptionLocation.head)
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach { tableBlock =>
+ tableBlock.getLocations.foreach { location =>
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
}
}
- )
- )
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
}
- )
+ }
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+ )
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
}
}
+
}
/**
* Use this RDD class to load RDD
+ *
* @param sc
* @param result
* @param carbonLoadModel
@@ -512,7 +507,7 @@ class DataFrameLoaderRDD[K, V](
var partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails()
var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
- theSplit.index
+ theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -521,14 +516,14 @@ class DataFrameLoaderRDD[K, V](
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize
+ loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
val rddIteratorKey = UUID.randomUUID().toString
- try{
+ try {
RddInputUtils.put(rddIteratorKey,
new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
carbonLoadModel.setRddIteratorKey(rddIteratorKey)
- loader.run
+ loader.run()
} finally {
RddInputUtils.remove(rddIteratorKey)
}
@@ -540,6 +535,7 @@ class DataFrameLoaderRDD[K, V](
}
var finished = false
+
override def hasNext: Boolean = !finished
override def next(): (K, V) = {
@@ -556,11 +552,12 @@ class DataFrameLoaderRDD[K, V](
/**
* This class wrap Scala's Iterator to Java's Iterator.
* It also convert all columns to string data to use csv data loading flow.
+ *
* @param rddIter
* @param carbonLoadModel
*/
class RddIterator(rddIter: Iterator[Row],
- carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+ carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
@@ -570,9 +567,10 @@ class RddIterator(rddIter: Iterator[Row],
def hasNext: Boolean = rddIter.hasNext
private def getString(value: Any, level: Int = 1): String = {
- value == null match {
- case true => ""
- case false => value match {
+ if (value == null) {
+ ""
+ } else {
+ value match {
case s: String => s
case i: java.lang.Integer => i.toString
case d: java.lang.Double => d.toString
@@ -623,4 +621,5 @@ class RddIterator(rddIter: Iterator[Row],
def remove(): Unit = {
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4392efe..1382efa 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{util => _, _}
import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
@@ -44,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
+CompactionType}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.etl.DataLoadingException
@@ -56,6 +58,7 @@ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
+
/**
* This is the factory class which can create different RDD depends on user needs.
*
@@ -178,8 +181,12 @@ object CarbonDataRDDFactory extends Logging {
}
def configSplitMaxSize(context: SparkContext, filePaths: String,
- hadoopConfiguration: Configuration): Unit = {
- val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
+ hadoopConfiguration: Configuration): Unit = {
+ val defaultParallelism = if (context.defaultParallelism < 1) {
+ 1
+ } else {
+ context.defaultParallelism
+ }
val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
val blockSize =
hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
@@ -191,30 +198,26 @@ object CarbonDataRDDFactory extends Logging {
newSplitSize = CarbonCommonConstants.CARBON_16MB
}
hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
- logInfo("totalInputSpaceConsumed : " + spaceConsumed +
- " , defaultParallelism : " + defaultParallelism)
- logInfo("mapreduce.input.fileinputformat.split.maxsize : " + newSplitSize.toString)
+ logInfo(s"totalInputSpaceConsumed: $spaceConsumed , defaultParallelism: $defaultParallelism")
+ logInfo(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
}
}
def alterTableForCompaction(sqlContext: SQLContext,
- alterTableModel: AlterTableModel,
- carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
- kettleHomePath: String, storeLocation: String): Unit = {
+ alterTableModel: AlterTableModel,
+ carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
+ kettleHomePath: String, storeLocation: String): Unit = {
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
compactionType = CompactionType.MAJOR_COMPACTION
- }
- else {
+ } else {
compactionType = CompactionType.MINOR_COMPACTION
}
- logger
- .audit(s"Compaction request received for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
+ logger.audit(s"Compaction request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
@@ -244,9 +247,7 @@ object CarbonDataRDDFactory extends Logging {
// if any other request comes at this time then it will create a compaction request file.
// so that this will be taken up by the compaction process which is executing.
if (!isConcurrentCompactionAllowed) {
- logger
- .info("System level compaction lock is enabled."
- )
+ logger.info("System level compaction lock is enabled.")
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
partitioner,
@@ -257,8 +258,7 @@ object CarbonDataRDDFactory extends Logging {
carbonTable,
compactionModel
)
- }
- else {
+ } else {
// normal flow of compaction
val lock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
@@ -266,10 +266,8 @@ object CarbonDataRDDFactory extends Logging {
)
if (lock.lockWithRetries()) {
- logger
- .info("Acquired the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.info("Acquired the compaction lock for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -280,45 +278,37 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
}
- }
- else {
- logger
- .audit("Not able to acquire the compaction lock for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.error(s"Not able to acquire the compaction lock for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
sys.error("Table is already locked for compaction. Please try after some time.")
}
}
}
def handleCompactionForSystemLocking(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storePath: String,
- kettleHomePath: String,
- storeLocation: String,
- compactionType: CompactionType,
- carbonTable: CarbonTable,
- compactionModel: CompactionModel): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storePath: String,
+ kettleHomePath: String,
+ storeLocation: String,
+ compactionType: CompactionType,
+ carbonTable: CarbonTable,
+ compactionModel: CompactionModel): Unit = {
val lock = CarbonLockFactory
.getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
)
if (lock.lockWithRetries()) {
- logger
- .info("Acquired the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
+ s".${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
@@ -329,50 +319,43 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
// if the compaction is a blocking call then only need to throw the exception.
if (compactionModel.isDDLTrigger) {
throw e
}
}
- }
- else {
- logger
- .audit("Not able to acquire the system level compaction lock for table " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the system level compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.error("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
.createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
// do sys error only in case of DDL trigger.
- if(compactionModel.isDDLTrigger) {
- sys.error("Compaction is in progress, compaction request for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue.")
- }
- else {
- logger
- .error("Compaction is in progress, compaction request for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue."
- )
+ if (compactionModel.isDDLTrigger) {
+ sys.error("Compaction is in progress, compaction request for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
+ } else {
+ logger.error("Compaction is in progress, compaction request for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " is in queue.")
}
}
}
def executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
- compactionModel: CompactionModel,
- partitioner: Partitioner,
- executor: ExecutorService,
- sqlContext: SQLContext,
- kettleHomePath: String,
- storeLocation: String): Unit = {
+ storePath: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ kettleHomePath: String,
+ storeLocation: String): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
)
@@ -413,10 +396,9 @@ object CarbonDataRDDFactory extends Logging {
future.get
}
)
- }
- catch {
+ } catch {
case e: Exception =>
- logger.error("Exception in compaction thread " + e.getMessage)
+ logger.error(s"Exception in compaction thread ${ e.getMessage }")
throw e
}
@@ -442,22 +424,23 @@ object CarbonDataRDDFactory extends Logging {
)
}
}
+
/**
* This will submit the loads to be merged into the executor.
*
* @param futureList
*/
def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
- loadsToMerge: util
- .List[LoadMetadataDetails],
- executor: ExecutorService,
- storePath: String,
- sqlContext: SQLContext,
- compactionModel: CompactionModel,
- kettleHomePath: String,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storeLocation: String): Unit = {
+ loadsToMerge: util
+ .List[LoadMetadataDetails],
+ executor: ExecutorService,
+ storePath: String,
+ sqlContext: SQLContext,
+ compactionModel: CompactionModel,
+ kettleHomePath: String,
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storeLocation: String): Unit = {
loadsToMerge.asScala.foreach(seg => {
logger.info("loads identified for merge is " + seg.getLoadName)
@@ -484,13 +467,13 @@ object CarbonDataRDDFactory extends Logging {
}
def startCompactionThreads(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storePath: String,
- kettleHomePath: String,
- storeLocation: String,
- compactionModel: CompactionModel,
- compactionLock: ICarbonLock): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ storePath: String,
+ kettleHomePath: String,
+ storeLocation: String,
+ compactionModel: CompactionModel,
+ compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -499,138 +482,123 @@ object CarbonDataRDDFactory extends Logging {
// clean up of the stale segments.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- }
- catch {
+ } catch {
case e: Exception =>
- logger
- .error("Exception in compaction thread while clean up of stale segments " + e
- .getMessage
- )
+ logger.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
}
- val compactionThread = new Thread {
- override def run(): Unit = {
+ val compactionThread = new Thread {
+ override def run(): Unit = {
+ try {
+ // compaction status of the table which is triggered by the user.
+ var triggeredCompactionStatus = false
+ var exception: Exception = null
try {
- // compaction status of the table which is triggered by the user.
- var triggeredCompactionStatus = false
- var exception : Exception = null
- try {
- executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
- compactionModel: CompactionModel,
- partitioner: Partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
+ executeCompaction(carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
+ )
+ triggeredCompactionStatus = true
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in compaction thread ${ e.getMessage }")
+ exception = e
+ }
+ // continue in case of exception also, check for all the tables.
+ val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ ).equalsIgnoreCase("true")
+
+ if (!isConcurrentCompactionAllowed) {
+ logger.info("System level compaction lock is enabled.")
+ val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
+ var tableForCompaction = CarbonCompactionUtil
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .tablesMeta.toArray, skipCompactionTables.toList.asJava
)
- triggeredCompactionStatus = true
- }
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread " + e.getMessage)
- exception = e
- }
- // continue in case of exception also, check for all the tables.
- val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
-
- if (!isConcurrentCompactionAllowed) {
- logger.info("System level compaction lock is enabled.")
- val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
- var tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.toList.asJava
+ while (null != tableForCompaction) {
+ logger.info("Compaction request has been identified for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ val table: CarbonTable = tableForCompaction.carbonTable
+ val metadataPath = table.getMetaDataFilepath
+ val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+ val newCarbonLoadModel = new CarbonLoadModel()
+ prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+ val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
)
- while (null != tableForCompaction) {
- logger
- .info("Compaction request has been identified for table " + tableForCompaction
- .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
- .getTableName
- )
- val table: CarbonTable = tableForCompaction.carbonTable
- val metadataPath = table.getMetaDataFilepath
- val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
- val newCarbonLoadModel = new CarbonLoadModel()
- prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
-
- val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
- val newcompactionModel = CompactionModel(compactionSize,
- compactionType,
- table,
- tableCreationTime,
- compactionModel.isDDLTrigger
+
+ val compactionSize = CarbonDataMergerUtil
+ .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+ val newcompactionModel = CompactionModel(compactionSize,
+ compactionType,
+ table,
+ tableCreationTime,
+ compactionModel.isDDLTrigger
+ )
+ // proceed for compaction
+ try {
+ executeCompaction(newCarbonLoadModel,
+ newCarbonLoadModel.getStorePath,
+ newcompactionModel,
+ partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
)
- // proceed for compaction
- try {
- executeCompaction(newCarbonLoadModel,
- newCarbonLoadModel.getStorePath,
- newcompactionModel,
- partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
- )
+ } catch {
+ case e: Exception =>
+ logger.error("Exception in compaction thread for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+ // not handling the exception. only logging as this is not the table triggered
+ // by user.
+ } finally {
+ // delete the compaction required file in case of failure or success also.
+ if (!CarbonCompactionUtil
+ .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+ // if the compaction request file is not been able to delete then
+ // add those tables details to the skip list so that it wont be considered next.
+ skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+ logger.error("Compaction request file can not be deleted for table " +
+ s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+ s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
}
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread for table " + tableForCompaction
- .carbonTable.getDatabaseName + "." +
- tableForCompaction.carbonTableIdentifier
- .getTableName)
- // not handling the exception. only logging as this is not the table triggered
- // by user.
- }
- finally {
- // delete the compaction required file in case of failure or success also.
- if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath, compactionType)) {
- // if the compaction request file is not been able to delete then
- // add those tables details to the skip list so that it wont be considered next.
- skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
- logger
- .error("Compaction request file can not be deleted for table " +
- tableForCompaction
- .carbonTable.getDatabaseName + "." + tableForCompaction
- .carbonTableIdentifier
- .getTableName
- )
-
- }
- }
- // ********* check again for all the tables.
- tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
- }
- // giving the user his error for telling in the beeline if his triggered table
- // compaction is failed.
- if (!triggeredCompactionStatus) {
- throw new Exception("Exception in compaction " + exception.getMessage)
}
+ // ********* check again for all the tables.
+ tableForCompaction = CarbonCompactionUtil
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .tablesMeta.toArray, skipCompactionTables.asJava
+ )
+ }
+ // giving the user his error for telling in the beeline if his triggered table
+ // compaction is failed.
+ if (!triggeredCompactionStatus) {
+ throw new Exception("Exception in compaction " + exception.getMessage)
}
}
- finally {
- executor.shutdownNow()
- deletePartialLoadsInCompaction(carbonLoadModel)
- compactionLock.unlock()
- }
+ } finally {
+ executor.shutdownNow()
+ deletePartialLoadsInCompaction(carbonLoadModel)
+ compactionLock.unlock()
}
}
+ }
// calling the run method of a thread to make the call as blocking call.
// in the future we may make this as concurrent.
compactionThread.run()
}
def prepareCarbonLoadModel(storePath: String,
- table: CarbonTable,
- newCarbonLoadModel: CarbonLoadModel): Unit = {
+ table: CarbonTable,
+ newCarbonLoadModel: CarbonLoadModel): Unit = {
newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
newCarbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -651,13 +619,10 @@ object CarbonDataRDDFactory extends Logging {
// so deleting those folders.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- }
- catch {
+ } catch {
case e: Exception =>
- logger
- .error("Exception in compaction thread while clean up of stale segments " + e
- .getMessage
- )
+ logger.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
}
}
@@ -674,13 +639,11 @@ object CarbonDataRDDFactory extends Logging {
val isAgg = false
// for handling of the segment Merging.
def handleSegmentMerging(tableCreationTime: Long): Unit = {
- logger
- .info("compaction need status is " + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired())
+ logger.info(s"compaction need status is" +
+ s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
- logger
- .audit("Compaction request received for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit(s"Compaction request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
@@ -717,8 +680,7 @@ object CarbonDataRDDFactory extends Logging {
carbonTable,
compactionModel
)
- }
- else {
+ } else {
val lock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.COMPACTION_LOCK
@@ -736,37 +698,34 @@ object CarbonDataRDDFactory extends Logging {
compactionModel,
lock
)
- }
- catch {
- case e : Exception =>
- logger.error("Exception in start compaction thread. " + e.getMessage)
+ } catch {
+ case e: Exception =>
+ logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
}
- }
- else {
- logger
- .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
- logger
- .error("Not able to acquire the compaction lock for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ } else {
+ logger.audit("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
+ logger.error("Not able to acquire the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
}
}
}
}
try {
- logger
- .audit("Data load request has been received for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit(s"Data load request has been received for table" +
+ s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
if (!useKettle) {
- logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
- .getDatabaseName + "." + carbonLoadModel.getTableName
- )
+ logger.audit("Data is loading with New Data Flow for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
@@ -801,13 +760,10 @@ object CarbonDataRDDFactory extends Logging {
// so deleting those folders.
try {
CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
- }
- catch {
+ } catch {
case e: Exception =>
logger
- .error("Exception in data load while clean up of stale segments " + e
- .getMessage
- )
+ .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
}
// reading the start time of data load.
@@ -826,14 +782,14 @@ object CarbonDataRDDFactory extends Logging {
var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
var status: Array[(String, LoadMetadataDetails)] = null
- def loadDataFile(): Unit = { isTableSplitPartition match {
- case true =>
+ def loadDataFile(): Unit = {
+ if (isTableSplitPartition) {
/*
- * when data handle by table split partition
- * 1) get partition files, direct load or not will get the different files path
- * 2) get files blocks by using SplitUtils
- * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
- */
+ * when data handle by table split partition
+ * 1) get partition files, direct load or not will get the different files path
+ * 2) get files blocks by using SplitUtils
+ * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+ */
var splits = Array[TableSplit]()
if (carbonLoadModel.isDirectLoad) {
// get all table Splits, this part means files were divide to different partitions
@@ -865,7 +821,7 @@ object CarbonDataRDDFactory extends Logging {
val pathBuilder = new StringBuilder()
pathBuilder.append(carbonLoadModel.getFactFilePath)
if (!carbonLoadModel.getFactFilePath.endsWith("/")
- && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
+ && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
pathBuilder.append("/")
}
pathBuilder.append(split.getPartition.getUniqueID).append("/")
@@ -873,16 +829,15 @@ object CarbonDataRDDFactory extends Logging {
SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
}
}
-
- case false =>
+ } else {
/*
- * when data load handle by node partition
- * 1)clone the hadoop configuration,and set the file path to the configuration
- * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
- * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
- * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
- * which parititon by host
- */
+ * when data load handle by node partition
+ * 1)clone the hadoop configuration,and set the file path to the configuration
+ * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
+ * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
+ * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
+ * which parititon by host
+ */
val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
// FileUtils will skip file which is no csv, and return all file path which split by ','
val filePaths = carbonLoadModel.getFactFilePath
@@ -921,9 +876,11 @@ object CarbonDataRDDFactory extends Logging {
.nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
.toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
- logInfo("Total Time taken in block allocation : " + timeElapsed)
- logInfo("Total no of blocks : " + blockList.size
- + ", No.of Nodes : " + nodeBlockMapping.size
+ logInfo("Total Time taken in block allocation: " + timeElapsed)
+ logInfo(s"Total no of blocks: ${ blockList.length }, No.of Nodes: ${
+ nodeBlockMapping
+ .size
+ }"
)
var str = ""
nodeBlockMapping.foreach(entry => {
@@ -983,7 +940,7 @@ object CarbonDataRDDFactory extends Logging {
var rdd = dataFrame.get.rdd
var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
- rdd = rdd.coalesce(numPartitions, false)
+ rdd = rdd.coalesce(numPartitions, shuffle = false)
status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
@@ -1061,37 +1018,34 @@ object CarbonDataRDDFactory extends Logging {
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
logInfo("********clean up done**********")
logger.audit(s"Data load is failed for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
logWarning("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
- val metadataDetails = status(0)._2
- if (!isAgg) {
- val status = CarbonLoaderUtil
- .recordLoadMetadata(currentLoadCount,
- metadataDetails,
- carbonLoadModel,
- loadStatus,
- loadStartTime
- )
- if (!status) {
- val errorMessage = "Dataload failed due to failure in table status updation."
- logger.audit("Data load is failed for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
- logger.error("Dataload failed due to failure in table status updation.")
- throw new Exception(errorMessage)
- }
- } else if (!carbonLoadModel.isRetentionRequest) {
- // TODO : Handle it
- logInfo("********Database updated**********")
+ val metadataDetails = status(0)._2
+ if (!isAgg) {
+ val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
+ carbonLoadModel, loadStatus, loadStartTime)
+ if (!status) {
+ val errorMessage = "Dataload failed due to failure in table status updation."
+ logger.audit("Data load is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${
+ carbonLoadModel
+ .getTableName
+ }")
+ logger.error("Dataload failed due to failure in table status updation.")
+ throw new Exception(errorMessage)
}
- logger.audit("Data load is successful for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+ } else if (!carbonLoadModel.isRetentionRequest) {
+ // TODO : Handle it
+ logInfo("********Database updated**********")
+ }
+ logger.audit("Data load is successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
- }
- catch {
+ } catch {
case e: Exception =>
throw new Exception(
"Dataload is success. Auto-Compaction has failed. Please check logs.")
@@ -1111,10 +1065,10 @@ object CarbonDataRDDFactory extends Logging {
}
def deleteLoadsAndUpdateMetadata(
- carbonLoadModel: CarbonLoadModel,
- table: CarbonTable, partitioner: Partitioner,
- storePath: String,
- isForceDeletion: Boolean) {
+ carbonLoadModel: CarbonLoadModel,
+ table: CarbonTable, partitioner: Partitioner,
+ storePath: String,
+ isForceDeletion: Boolean) {
if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
val loadMetadataFilePath = CarbonLoaderUtil
.extractLoadMetadataFileLocation(carbonLoadModel)
@@ -1132,36 +1086,34 @@ object CarbonDataRDDFactory extends Logging {
if (isUpdationRequired) {
try {
- // Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
- logger.info("Table status lock has been successfully acquired.")
+ // Update load metadate file after cleaning deleted nodes
+ if (carbonTableStatusLock.lockWithRetries()) {
+ logger.info("Table status lock has been successfully acquired.")
- // read latest table status again.
- val latestMetadata = segmentStatusManager
- .readLoadMetadata(loadMetadataFilePath)
+ // read latest table status again.
+ val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
- // update the metadata details from old to new status.
+ // update the metadata details from old to new status.
+ val latestStatus = CarbonLoaderUtil
+ .updateLoadMetadataFromOldToNew(details, latestMetadata)
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(
- carbonLoadModel.getCarbonDataLoadSchema,
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, latestStatus
- )
- }
- else {
- val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
- "." + carbonLoadModel.getTableName +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- logger.audit(errorMsg)
- logger.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
+ CarbonLoaderUtil.writeLoadMetadata(
+ carbonLoadModel.getCarbonDataLoadSchema,
+ carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, latestStatus
+ )
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }" +
+ ". Not able to acquire the table status lock due to other operation " +
+ "running in the background."
+ logger.audit(errorMsg)
+ logger.error(errorMsg)
+ throw new Exception(errorMsg + " Please try after some time.")
- }
- } finally {
+ }
+ } finally {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
}
}
@@ -1197,10 +1149,9 @@ object CarbonDataRDDFactory extends Logging {
partitioner,
storePath,
isForceDeletion = true)
- }
- else {
- val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
- "." + carbonLoadModel.getTableName +
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
logger.audit(errorMsg)
@@ -1208,10 +1159,10 @@ object CarbonDataRDDFactory extends Logging {
throw new Exception(errorMsg + " Please try after some time.")
}
- }
- finally {
+ } finally {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 8c52249..17b487c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -86,7 +86,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index df40ed7..57bf124 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -77,7 +77,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonLoadPartition]
val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
+ logInfo("Host Name: " + s.head + s.length)
s
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b7da579..bce4eb2 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifie
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -67,6 +67,7 @@ trait GenericParser {
case class DictionaryStats(distinctValues: java.util.List[String],
dictWriteTime: Long, sortIndexWriteTime: Long)
+
case class PrimitiveParser(dimension: CarbonDimension,
setOpt: Option[HashSet[String]]) extends GenericParser {
val (hasDictEncoding, set: HashSet[String]) = setOpt match {
@@ -164,20 +165,21 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
* A RDD to combine all dictionary distinct values.
*
* @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev the input RDD[(String, Iterable[String])]
+ * @param prev the input RDD[(String, Iterable[String])]
* @param model a model package load info
*/
class CarbonAllDictionaryCombineRDD(
- prev: RDD[(String, Iterable[String])],
- model: DictionaryLoadModel)
+ prev: RDD[(String, Iterable[String])],
+ model: DictionaryLoadModel)
extends RDD[(Int, ColumnDistinctValues)](prev) with Logging {
- override def getPartitions: Array[Partition] =
+ override def getPartitions: Array[Partition] = {
firstParent[(String, Iterable[String])].partitions
+ }
override def compute(split: Partition, context: TaskContext
- ): Iterator[(Int, ColumnDistinctValues)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass().getName())
+ ): Iterator[(Int, ColumnDistinctValues)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
/*
@@ -240,7 +242,7 @@ class CarbonBlockDistinctValuesCombineRDD(
override def compute(split: Partition,
context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
var rowCount = 0L
try {
@@ -259,7 +261,7 @@ class CarbonBlockDistinctValuesCombineRDD(
}
}
}
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
} catch {
case ex: Exception =>
LOGGER.error(ex)
@@ -288,7 +290,7 @@ class CarbonGlobalDictionaryGenerateRDD(
override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass().getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
var isHighCardinalityColumn = false
val iter = new Iterator[(Int, String, Boolean)] {
@@ -303,11 +305,11 @@ class CarbonGlobalDictionaryGenerateRDD(
model.hdfsTempLocation)
}
if (StringUtils.isNotBlank(model.lockType)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
- model.lockType)
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+ model.lockType)
}
if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
model.zooKeeperUrl)
}
val dictLock = CarbonLockFactory
@@ -320,7 +322,7 @@ class CarbonGlobalDictionaryGenerateRDD(
val valuesBuffer = new mutable.HashSet[String]
val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
var rowCount = 0L
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
breakable {
while (rddIter.hasNext) {
val distinctValueList = rddIter.next()._2
@@ -329,7 +331,7 @@ class CarbonGlobalDictionaryGenerateRDD(
// check high cardinality
if (model.isFirstLoad && model.highCardIdentifyEnable
&& !model.isComplexes(split.index)
- && model.dimensions(split.index).isColumnar()) {
+ && model.dimensions(split.index).isColumnar) {
isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
valuesBuffer.size, rowCount, model)
if (isHighCardinalityColumn) {
@@ -338,10 +340,13 @@ class CarbonGlobalDictionaryGenerateRDD(
}
}
}
- val combineListTime = (System.currentTimeMillis() - t1)
+ val combineListTime = System.currentTimeMillis() - t1
if (isHighCardinalityColumn) {
- LOGGER.info("column " + model.table.getTableUniqueName + "." +
- model.primDimensions(split.index).getColName + " is high cardinality column")
+ LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+ s"${
+ model.primDimensions(split.index)
+ .getColName
+ } is high cardinality column")
} else {
isDictionaryLocked = dictLock.lockWithRetries()
if (isDictionaryLocked) {
@@ -367,7 +372,7 @@ class CarbonGlobalDictionaryGenerateRDD(
} else {
null
}
- val dictCacheTime = (System.currentTimeMillis - t2)
+ val dictCacheTime = System.currentTimeMillis - t2
val t3 = System.currentTimeMillis()
val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
dictionaryForDistinctValueLookUp,
@@ -375,7 +380,7 @@ class CarbonGlobalDictionaryGenerateRDD(
split.index)
// execute dictionary writer task to get distinct values
val distinctValues = dictWriteTask.execute()
- val dictWriteTime = (System.currentTimeMillis() - t3)
+ val dictWriteTime = System.currentTimeMillis() - t3
val t4 = System.currentTimeMillis()
// if new data came than rewrite sort index file
if (distinctValues.size() > 0) {
@@ -385,22 +390,21 @@ class CarbonGlobalDictionaryGenerateRDD(
distinctValues)
sortIndexWriteTask.execute()
}
- val sortIndexWriteTime = (System.currentTimeMillis() - t4)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+ val sortIndexWriteTime = System.currentTimeMillis() - t4
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
// After sortIndex writing, update dictionaryMeta
dictWriteTask.updateMetaData()
// clear the value buffer after writing dictionary data
valuesBuffer.clear
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
dictionaryForDistinctValueLookUpCleared = true
- LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
- "\n columnId:" + model.primDimensions(split.index).getColumnId +
- "\n new distinct values count:" + distinctValues.size() +
- "\n combine lists:" + combineListTime +
- "\n create dictionary cache:" + dictCacheTime +
- "\n sort list, distinct and write:" + dictWriteTime +
- "\n write sort info:" + sortIndexWriteTime)
+ LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+ s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+ s"\n new distinct values count: ${ distinctValues.size() }" +
+ s"\n combine lists: $combineListTime" +
+ s"\n create dictionary cache: $dictCacheTime" +
+ s"\n sort list, distinct and write: $dictWriteTime" +
+ s"\n write sort info: $sortIndexWriteTime")
}
} catch {
case ex: Exception =>
@@ -408,11 +412,9 @@ class CarbonGlobalDictionaryGenerateRDD(
throw ex
} finally {
if (!dictionaryForDistinctValueLookUpCleared) {
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
}
- org.apache.carbondata.core.util.CarbonUtil
- .clearDictionaryCache(dictionaryForSortIndexWriting);
+ CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
if (dictLock != null && isDictionaryLocked) {
if (dictLock.unlock()) {
logInfo(s"Dictionary ${
@@ -441,14 +443,17 @@ class CarbonGlobalDictionaryGenerateRDD(
(split.index, status, isHighCardinalityColumn)
}
}
+
iter
}
+
}
+
/**
* Set column dictionry patition format
*
- * @param id partition id
- * @param dimension current carbon dimension
+ * @param id partition id
+ * @param dimension current carbon dimension
*/
class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
extends Partition {
@@ -460,13 +465,13 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
/**
* Use external column dict to generate global dictionary
*
- * @param carbonLoadModel carbon load model
- * @param sparkContext spark context
- * @param table carbon table identifier
- * @param dimensions carbon dimenisons having predefined dict
- * @param hdfsLocation carbon base store path
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext spark context
+ * @param table carbon table identifier
+ * @param dimensions carbon dimenisons having predefined dict
+ * @param hdfsLocation carbon base store path
* @param dictFolderPath path of dictionary folder
-*/
+ */
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
dictionaryLoadModel: DictionaryLoadModel,
sparkContext: SparkContext,
@@ -505,25 +510,25 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
} catch {
case ex: Exception =>
logError(s"Error in reading pre-defined " +
- s"dictionary file:${ex.getMessage}")
+ s"dictionary file:${ ex.getMessage }")
throw ex
} finally {
if (csvReader != null) {
try {
- csvReader.close
+ csvReader.close()
} catch {
case ex: Exception =>
logError(s"Error in closing csvReader of " +
- s"pre-defined dictionary file:${ex.getMessage}")
+ s"pre-defined dictionary file:${ ex.getMessage }")
}
}
if (inputStream != null) {
try {
- inputStream.close
+ inputStream.close()
} catch {
case ex: Exception =>
logError(s"Error in closing inputStream of " +
- s"pre-defined dictionary file:${ex.getMessage}")
+ s"pre-defined dictionary file:${ ex.getMessage }")
}
}
}