You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:48 UTC
[10/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
new file mode 100644
index 0000000..1d8d6b2
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.SparkUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
+import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This partition class use to split by TableSplit
+ *
+ */
+class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit,
+ val blocksDetails: Array[BlockDetails])
+ extends Partition {
+
+ override val index: Int = idx
+ val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
+ val partitionBlocksDetail = blocksDetails
+
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * This partition class use to split by Host
+ *
+ */
+class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
+ val blocksDetails: Array[BlockDetails])
+ extends Partition {
+
+ override val index: Int = idx
+ val serializableHadoopSplit = host
+ val nodeBlocksDetail = blocksDetails
+
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+class SparkPartitionLoader(model: CarbonLoadModel,
+ splitIndex: Int,
+ storePath: String,
+ kettleHomePath: String,
+ loadCount: Int,
+ loadMetadataDetails: LoadMetadataDetails) {
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ var storeLocation: String = ""
+
+ def initialize(): Unit = {
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
+ }
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+ CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
+ CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
+ CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
+ CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
+ CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
+ CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
+ CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
+ CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
+
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.nonEmpty) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ } else {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
+ }
+
+ def run(): Unit = {
+ try {
+ CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
+ kettleHomePath)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ } catch {
+ case e: DataLoadingException => if (e.getErrorCode ==
+ DataProcessorConstants.BAD_REC_FOUND) {
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ LOGGER.info("Bad Record Found")
+ } else {
+ throw e
+ }
+ case e: Exception =>
+ throw e
+ } finally {
+ // delete temp location data
+ try {
+ val isCompaction = false
+ CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Failed to delete local data")
+ }
+ if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+ loadMetadataDetails.getLoadStatus)) {
+ if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ .equals(loadMetadataDetails.getLoadStatus)) {
+ LOGGER.info("DataLoad complete")
+ LOGGER.info("Data Load partially successful with LoadCount:" + loadCount)
+ } else {
+ LOGGER.info("DataLoad complete")
+ LOGGER.info("Data Loaded successfully with LoadCount:" + loadCount)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
+ model.getPartitionId)
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Use this RDD class to load csv data file
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param result Output result
+ * @param carbonLoadModel Carbon load model which contain the load info
+ * @param storePath The store location
+ * @param kettleHomePath The kettle home path
+ * @param columinar whether it is columinar
+ * @param loadCount Current load count
+ * @param tableCreationTime Time of creating table
+ * @param schemaLastUpdatedTime Time of last schema update
+ * @param blocksGroupBy Blocks Array which is group by partition or host
+ * @param isTableSplitPartition Whether using table split partition
+ * @tparam K Class of the key associated with the Result.
+ * @tparam V Class of the value associated with the Result.
+ */
+class DataFileLoaderRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ kettleHomePath: String,
+ columinar: Boolean,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ blocksGroupBy: Array[(String, Array[BlockDetails])],
+ isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ override def getPartitions: Array[Partition] = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits = Array[TableSplit]()
+ if (carbonLoadModel.isDirectLoad) {
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+ } else {
+ splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null)
+ }
+
+ splits.zipWithIndex.map { case (split, index) =>
+ // filter the same partition unique id, because only one will match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+ uniqueId == split.getPartition.getUniqueID
+ }(0)._2
+ new CarbonTableSplitPartition(id, index, split, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+ new CarbonNodePartition(id, index, uniqueId, blockDetails)
+ }
+ }
+ }
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD should not be checkpointed.
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var model: CarbonLoadModel = _
+ var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
+ theSplit.index
+ try {
+ loadMetadataDetails.setPartitionCount(partitionID)
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ setModelAndBlocksInfo()
+ val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
+ kettleHomePath, loadCount, loadMetadataDetails)
+ loader.initialize
+ if (model.isRetentionRequest) {
+ recreateAggregationTableForRetention
+ } else if (model.isAggLoadRequest) {
+ loadMetadataDetails.setLoadStatus(createManualAggregateTable)
+ } else {
+ loader.run()
+ }
+ } catch {
+ case e: Exception =>
+ logInfo("DataLoad failure")
+ LOGGER.error(e)
+ throw e
+ }
+
+ def setModelAndBlocksInfo(): Unit = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+ // get this partition data blocks and put it to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+ StandardLogService.setThreadName(partitionID, null)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ } else {
+ // for node partition
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setBlocksID(blocksID)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ // set this node blocks info to global static map
+ GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+ if (carbonLoadModel.isDirectLoad) {
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(partitionID)
+ }
+ StandardLogService.setThreadName(blocksID, null)
+ }
+ }
+
+ /**
+ * generate blocks id
+ *
+ * @return
+ */
+ def gernerateBlocksID: String = {
+ if (isTableSplitPartition) {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+ .getPartition.getUniqueID + "_" + UUID.randomUUID()
+ } else {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
+ }
+ }
+
+ def checkAndLoadAggregationTable: String = {
+ val schema = model.getCarbonDataLoadSchema
+ val aggTables = schema.getCarbonTable.getAggregateTablesName
+ if (null != aggTables && !aggTables.isEmpty) {
+ val details = model.getLoadMetadataDetails.asScala.toArray
+ val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
+ var listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+ listOfLoadFolders = CarbonLoaderUtil.addNewSliceNameToList(newSlice, listOfLoadFolders)
+ val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+ var listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
+ listOfAllLoadFolders = CarbonLoaderUtil
+ .addNewSliceNameToList(newSlice, listOfAllLoadFolders)
+ val copyListOfLoadFolders = listOfLoadFolders.asScala.toList
+ val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList
+ loadTableSlices(listOfAllLoadFolders, details)
+ val loadFolders = Array[String]()
+ loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables,
+ copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders))
+ if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+ loadMetadataDetails.getLoadStatus)) {
+ // remove the current slice from memory not the table
+ CarbonLoaderUtil
+ .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
+ logInfo(s"Aggregate table creation failed")
+ } else {
+ logInfo("Aggregate tables creation successfull")
+ }
+ }
+ loadMetadataDetails.getLoadStatus
+ }
+
+ def loadTableSlices(listOfAllLoadFolders: java.util.List[String],
+ loadMetadataDetails: Array[LoadMetadataDetails]) = {
+ CarbonProperties.getInstance().addProperty("carbon.cache.used", "false")
+ // TODO: Implement it
+ }
+
+ def createManualAggregateTable: String = {
+ val details = model.getLoadMetadataDetails.asScala.toArray
+ val listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
+ val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+ val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+ loadTableSlices(listOfAllLoadFolders, details)
+ val loadFolders = Array[String]()
+ val aggTable = model.getAggTableName
+ loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+ listOfUpdatedLoadFolders, loadFolders))
+ if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+ loadMetadataDetails.getLoadStatus)) {
+ logInfo(s"Aggregate table creation failed :: $aggTable")
+ } else {
+ logInfo(s"Aggregate table creation successfull :: $aggTable")
+ }
+ loadMetadataDetails.getLoadStatus
+ }
+
+ def recreateAggregationTableForRetention = {
+ val schema = model.getCarbonDataLoadSchema
+ val aggTables = schema.getCarbonTable.getAggregateTablesName
+ if (null != aggTables && !aggTables.isEmpty) {
+ val details = model.getLoadMetadataDetails.asScala.toArray
+ val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
+ val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
+ val listOfAllLoadFolder = CarbonQueryUtil.getListOfSlices(details)
+ loadTableSlices(listOfAllLoadFolder, details)
+ val loadFolders = Array[String]()
+ iterateOverAggTables(aggTables, listOfLoadFolders, listOfUpdatedLoadFolders, loadFolders)
+ }
+ }
+
+ // TODO Aggregate table needs to be handled
+ def iterateOverAggTables(aggTables: java.util.List[String],
+ listOfLoadFolders: java.util.List[String],
+ listOfUpdatedLoadFolders: java.util.List[String],
+ loadFolders: Array[String]): String = {
+ model.setAggLoadRequest(true)
+ aggTables.asScala.foreach { aggTable =>
+ model.setAggTableName(aggTable)
+ loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+ listOfUpdatedLoadFolders, loadFolders))
+ if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+ loadMetadataDetails.getLoadStatus)) {
+ logInfo(s"Aggregate table creation failed :: aggTable")
+ return loadMetadataDetails.getLoadStatus
+ }
+ }
+ loadMetadataDetails.getLoadStatus
+ }
+
+ def loadAggregationTable(listOfLoadFolders: java.util.List[String],
+ listOfUpdatedLoadFolders: java.util.List[String],
+ loadFolders: Array[String]): String = {
+ // TODO: Implement it
+ loadMetadataDetails.getLoadStatus
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+ val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ location
+ } else {
+ // for node partition
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split: " + firstOptionLocation.head)
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach { tableBlock =>
+ tableBlock.getLocations.foreach { location =>
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
+ }
+ }
+ }
+ }
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+ )
+
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
+ }
+ }
+
+}
+
+/**
+ * Use this RDD class to load RDD
+ *
+ * @param sc
+ * @param result
+ * @param carbonLoadModel
+ * @param storePath
+ * @param kettleHomePath
+ * @param columinar
+ * @param loadCount
+ * @param tableCreationTime
+ * @param schemaLastUpdatedTime
+ * @param prev
+ * @tparam K
+ * @tparam V
+ */
+class DataFrameLoaderRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ kettleHomePath: String,
+ columinar: Boolean,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ @DeveloperApi
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val resultIter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
+ theSplit.index
+ try {
+ loadMetadataDetails.setPartitionCount(partitionID)
+ carbonLoadModel.setPartitionId(partitionID)
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
+ kettleHomePath, loadCount, loadMetadataDetails)
+ loader.initialize
+ val rddIteratorKey = UUID.randomUUID().toString
+ try {
+ RddInputUtils.put(rddIteratorKey,
+ new PartitionIterator(
+ firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
+ carbonLoadModel,
+ context))
+ carbonLoadModel.setRddIteratorKey(rddIteratorKey)
+ loader.run()
+ } finally {
+ RddInputUtils.remove(rddIteratorKey)
+ }
+ } catch {
+ case e: Exception =>
+ logInfo("DataLoad failure")
+ LOGGER.error(e)
+ throw e
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = !finished
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ resultIter
+ }
+
+ override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
+
+class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
+ def hasNext: Boolean = partitionIter.hasNext
+ def next: JavaRddIterator[Array[String]] = {
+ val value = partitionIter.next
+ new RddIterator(value.rdd.iterator(value.partition, context),
+ carbonLoadModel,
+ context)
+ }
+ def initialize: Unit = {
+ SparkUtil.setTaskContext(context)
+ }
+}
+/**
+ * This class wrap Scala's Iterator to Java's Iterator.
+ * It also convert all columns to string data to use csv data loading flow.
+ *
+ * @param rddIter
+ * @param carbonLoadModel
+ * @param context
+ */
+class RddIterator(rddIter: Iterator[Row],
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends JavaRddIterator[Array[String]] {
+
+ val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val format = new SimpleDateFormat(formatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ def hasNext: Boolean = rddIter.hasNext
+
+ def next: Array[String] = {
+ val row = rddIter.next()
+ val columns = new Array[String](row.length)
+ for (i <- 0 until columns.length) {
+ columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, format)
+ }
+ columns
+ }
+
+ def initialize: Unit = {
+ SparkUtil.setTaskContext(context)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
new file mode 100644
index 0000000..0534def
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.spark.DeletedLoadResult
+import org.apache.carbondata.spark.load.DeletedLoadMetadata
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDeleteLoadByDateRDD[K, V](
+ sc: SparkContext,
+ result: DeletedLoadResult[K, V],
+ databaseName: String,
+ tableName: String,
+ dateField: String,
+ dateFieldActualName: String,
+ dateValue: String,
+ factTableName: String,
+ dimTableName: String,
+ storePath: String,
+ loadMetadataDetails: List[LoadMetadataDetails])
+ extends RDD[(K, V)](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ override def getPartitions: Array[Partition] = {
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+ splits.zipWithIndex.map {s =>
+ new CarbonLoadPartition(id, s._2, s._1)
+ }
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ new Iterator[(K, V)] {
+ val deletedMetaData = new DeletedLoadMetadata()
+ val split = theSplit.asInstanceOf[CarbonLoadPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+ // TODO call CARBON delete API
+ logInfo("Applying data retention as per date value " + dateValue)
+ var dateFormat = ""
+ try {
+ dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ } catch {
+ case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
+ }
+ // TODO: Implement it
+ var finished = false
+
+ override def hasNext: Boolean = {
+ finished
+ }
+
+ override def next(): (K, V) = {
+ result.getKey(null, null)
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonLoadPartition]
+ val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ logInfo("Host Name: " + s.head + s.length)
+ s
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
new file mode 100644
index 0000000..26e1abc
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDeleteLoadRDD[V: ClassTag](
+ sc: SparkContext,
+ valueClass: Value[V],
+ loadId: Int,
+ databaseName: String,
+ tableName: String,
+ partitioner: Partitioner)
+ extends RDD[V](sc, Nil) {
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ override def getPartitions: Array[Partition] = {
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+ splits.zipWithIndex.map {f =>
+ new CarbonLoadPartition(id, f._2, f._1)
+ }
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+ val iter = new Iterator[V] {
+ val split = theSplit.asInstanceOf[CarbonLoadPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ // TODO call CARBON delete API
+
+ var havePair = false
+ var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = true
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ valueClass.getValue(null)
+ }
+
+ }
+ logInfo("********Deleting***************")
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonLoadPartition]
+ val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ logInfo("Host Name: " + s.head + s.length)
+ s
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
new file mode 100644
index 0000000..dc63098
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDropTableRDD[V: ClassTag](
+ sc: SparkContext,
+ valueClass: Value[V],
+ databaseName: String,
+ tableName: String)
+ extends RDD[V](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ override def getPartitions: Array[Partition] = {
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+ splits.zipWithIndex.map { s =>
+ new CarbonLoadPartition(id, s._2, s._1)
+ }
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+
+ val iter = new Iterator[V] {
+ // TODO: Clear Btree from memory
+
+ var havePair = false
+ var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = true
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ valueClass.getValue(null)
+ }
+ }
+ iter
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
new file mode 100644
index 0000000..3c15818
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.{DataInputStream, InputStreamReader}
+import java.nio.charset.Charset
+import java.text.SimpleDateFormat
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.util.control.Breaks.{break, breakable}
+
+import au.com.bytecode.opencsv.CSVReader
+import org.apache.commons.lang3.{ArrayUtils, StringUtils}
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.spark.util.GlobalDictionaryUtil
+
+/**
+ * A partitioner partition by column.
+ *
+ * @constructor create a partitioner
+ * @param numParts the number of partitions
+ */
+class ColumnPartitioner(numParts: Int) extends Partitioner {
+ override def numPartitions: Int = numParts
+
+ override def getPartition(key: Any): Int = key.asInstanceOf[Int]
+}
+
+trait GenericParser {
+ val dimension: CarbonDimension
+
+ def addChild(child: GenericParser): Unit
+
+ def parseString(input: String): Unit
+}
+
+case class DictionaryStats(distinctValues: java.util.List[String],
+ dictWriteTime: Long, sortIndexWriteTime: Long)
+
+case class PrimitiveParser(dimension: CarbonDimension,
+ setOpt: Option[HashSet[String]]) extends GenericParser {
+ val (hasDictEncoding, set: HashSet[String]) = setOpt match {
+ case None => (false, new HashSet[String])
+ case Some(x) => (true, x)
+ }
+
+ def addChild(child: GenericParser): Unit = {
+ }
+
+ def parseString(input: String): Unit = {
+ if (hasDictEncoding && input != null) {
+ set.add(input)
+ }
+ }
+}
+
+case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser {
+ var children: GenericParser = _
+
+ def addChild(child: GenericParser): Unit = {
+ children = child
+ }
+
+ def parseString(input: String): Unit = {
+ if (StringUtils.isNotEmpty(input)) {
+ val splits = format.getSplits(input)
+ if (ArrayUtils.isNotEmpty(splits)) {
+ splits.foreach { s =>
+ children.parseString(s)
+ }
+ }
+ }
+ }
+}
+
+case class StructParser(dimension: CarbonDimension,
+ format: DataFormat) extends GenericParser {
+ val children = new ArrayBuffer[GenericParser]
+
+ def addChild(child: GenericParser): Unit = {
+ children += child
+ }
+
+ def parseString(input: String): Unit = {
+ if (StringUtils.isNotEmpty(input)) {
+ val splits = format.getSplits(input)
+ val len = Math.min(children.length, splits.length)
+ for (i <- 0 until len) {
+ children(i).parseString(splits(i))
+ }
+ }
+ }
+}
+
+case class DataFormat(delimiters: Array[String],
+ var delimiterIndex: Int,
+ patterns: Array[Pattern]) extends Serializable {
+ self =>
+ def getSplits(input: String): Array[String] = {
+ // -1 in case after splitting the last column is empty, the surrogate key ahs to be generated
+ // for empty value too
+ patterns(delimiterIndex).split(input, -1)
+ }
+
+ def cloneAndIncreaseIndex: DataFormat = {
+ DataFormat(delimiters, Math.min(delimiterIndex + 1, delimiters.length - 1), patterns)
+ }
+}
+
+/**
+ * a case class to package some attributes
+ */
+case class DictionaryLoadModel(table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension],
+ hdfsLocation: String,
+ dictfolderPath: String,
+ dictFilePaths: Array[String],
+ dictFileExists: Array[Boolean],
+ isComplexes: Array[Boolean],
+ primDimensions: Array[CarbonDimension],
+ delimiters: Array[String],
+ highCardIdentifyEnable: Boolean,
+ highCardThreshold: Int,
+ rowCountPercentage: Double,
+ columnIdentifier: Array[ColumnIdentifier],
+ isFirstLoad: Boolean,
+ hdfsTempLocation: String,
+ lockType: String,
+ zooKeeperUrl: String,
+ serializationNullFormat: String) extends Serializable
+
+case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
+
+/**
+ * A RDD to combine all dictionary distinct values.
+ *
+ * @constructor create a RDD with RDD[(String, Iterable[String])]
+ * @param prev the input RDD[(String, Iterable[String])]
+ * @param model a model package load info
+ */
+class CarbonAllDictionaryCombineRDD(
+ prev: RDD[(String, Iterable[String])],
+ model: DictionaryLoadModel)
+ extends RDD[(Int, ColumnDistinctValues)](prev) {
+
+ override def getPartitions: Array[Partition] = {
+ firstParent[(String, Iterable[String])].partitions
+ }
+
+ override def compute(split: Partition, context: TaskContext
+ ): Iterator[(Int, ColumnDistinctValues)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+ /*
+ * for all dictionary, all columns need to encoding and checking
+ * isHighCardinalityColumn, so no need to calculate rowcount
+ */
+ val rowCount = 0L
+ try {
+ val dimensionParsers =
+ GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
+ val dimNum = model.dimensions.length
+ // Map[dimColName -> dimColNameIndex]
+ val columnIndexMap = new HashMap[String, Int]()
+ for (j <- 0 until dimNum) {
+ columnIndexMap.put(model.dimensions(j).getColName, j)
+ }
+
+ var row: (String, Iterable[String]) = null
+ val rddIter = firstParent[(String, Iterable[String])].iterator(split, context)
+ // generate block distinct value set
+ while (rddIter.hasNext) {
+ row = rddIter.next()
+ if (row != null) {
+ columnIndexMap.get(row._1) match {
+ case Some(index) =>
+ for (record <- row._2) {
+ dimensionParsers(index).parseString(record)
+ }
+ case None =>
+ }
+ }
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex)
+ throw ex
+ }
+
+ distinctValuesList.map { iter =>
+ val valueList = iter._2.toArray
+ (iter._1, ColumnDistinctValues(valueList, rowCount))
+ }.iterator
+ }
+}
+
+/**
+ * A RDD to combine distinct values in block.
+ *
+ * @constructor create a RDD with RDD[Row]
+ * @param prev the input RDD[Row]
+ * @param model a model package load info
+ */
+class CarbonBlockDistinctValuesCombineRDD(
+ prev: RDD[Row],
+ model: DictionaryLoadModel)
+ extends RDD[(Int, ColumnDistinctValues)](prev) {
+
+ override def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+ override def compute(split: Partition,
+ context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
+ val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+ var rowCount = 0L
+ try {
+ val dimensionParsers =
+ GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
+ val dimNum = model.dimensions.length
+ var row: Row = null
+ val rddIter = firstParent[Row].iterator(split, context)
+ val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val format = new SimpleDateFormat(formatString)
+ // generate block distinct value set
+ while (rddIter.hasNext) {
+ row = rddIter.next()
+ if (row != null) {
+ rowCount += 1
+ for (i <- 0 until dimNum) {
+ dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
+ model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
+ }
+ }
+ }
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex)
+ throw ex
+ }
+
+ distinctValuesList.map { iter =>
+ val valueList = iter._2.toArray
+ (iter._1, ColumnDistinctValues(valueList, rowCount))
+ }.iterator
+ }
+}
+
+/**
+ * A RDD to generate dictionary file for each column
+ *
+ * @constructor create a RDD with RDD[Row]
+ * @param prev the input RDD[Row]
+ * @param model a model package load info
+ */
+class CarbonGlobalDictionaryGenerateRDD(
+ prev: RDD[(Int, ColumnDistinctValues)],
+ model: DictionaryLoadModel)
+ extends RDD[(Int, String, Boolean)](prev) {
+
+ override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+ var isHighCardinalityColumn = false
+ val iter = new Iterator[(Int, String, Boolean)] {
+ var dictionaryForDistinctValueLookUp:
+ org.apache.carbondata.core.cache.dictionary.Dictionary = _
+ var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _
+ var dictionaryForDistinctValueLookUpCleared: Boolean = false
+ val pathService = CarbonCommonFactory.getPathService
+ val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table)
+ if (StringUtils.isNotBlank(model.hdfsTempLocation )) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+ model.hdfsTempLocation)
+ }
+ if (StringUtils.isNotBlank(model.lockType)) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+ model.lockType)
+ }
+ if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+ model.zooKeeperUrl)
+ }
+ val dictLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
+ model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
+ var isDictionaryLocked = false
+ // generate distinct value list
+ try {
+ val t1 = System.currentTimeMillis
+ val valuesBuffer = new mutable.HashSet[String]
+ val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
+ var rowCount = 0L
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
+ breakable {
+ while (rddIter.hasNext) {
+ val distinctValueList = rddIter.next()._2
+ valuesBuffer ++= distinctValueList.values
+ rowCount += distinctValueList.rowCount
+ // check high cardinality
+ if (model.isFirstLoad && model.highCardIdentifyEnable
+ && !model.isComplexes(split.index)
+ && model.dimensions(split.index).isColumnar) {
+ isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
+ valuesBuffer.size, rowCount, model)
+ if (isHighCardinalityColumn) {
+ break
+ }
+ }
+ }
+ }
+ val combineListTime = System.currentTimeMillis() - t1
+ if (isHighCardinalityColumn) {
+ LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+ s"${
+ model.primDimensions(split.index)
+ .getColName
+ } is high cardinality column")
+ } else {
+ isDictionaryLocked = dictLock.lockWithRetries()
+ if (isDictionaryLocked) {
+ logInfo(s"Successfully able to get the dictionary lock for ${
+ model.primDimensions(split.index).getColName
+ }")
+ } else {
+ sys
+ .error(s"Dictionary file ${
+ model.primDimensions(split.index).getColName
+ } is locked for updation. Please try after some time")
+ }
+ val t2 = System.currentTimeMillis
+ val fileType = FileFactory.getFileType(model.dictFilePaths(split.index))
+ model.dictFileExists(split.index) = FileFactory
+ .isFileExist(model.dictFilePaths(split.index), fileType)
+ dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
+ CarbonLoaderUtil.getDictionary(model.table,
+ model.columnIdentifier(split.index),
+ model.hdfsLocation,
+ model.primDimensions(split.index).getDataType
+ )
+ } else {
+ null
+ }
+ val dictCacheTime = System.currentTimeMillis - t2
+ val t3 = System.currentTimeMillis()
+ val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
+ dictionaryForDistinctValueLookUp,
+ model,
+ split.index)
+ // execute dictionary writer task to get distinct values
+ val distinctValues = dictWriteTask.execute()
+ val dictWriteTime = System.currentTimeMillis() - t3
+ val t4 = System.currentTimeMillis()
+ // if new data came than rewrite sort index file
+ if (distinctValues.size() > 0) {
+ val sortIndexWriteTask = new SortIndexWriterTask(model,
+ split.index,
+ dictionaryForDistinctValueLookUp,
+ distinctValues)
+ sortIndexWriteTask.execute()
+ }
+ val sortIndexWriteTime = System.currentTimeMillis() - t4
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
+ // After sortIndex writing, update dictionaryMeta
+ dictWriteTask.updateMetaData()
+ // clear the value buffer after writing dictionary data
+ valuesBuffer.clear
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
+ dictionaryForDistinctValueLookUpCleared = true
+ LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+ s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+ s"\n new distinct values count: ${ distinctValues.size() }" +
+ s"\n combine lists: $combineListTime" +
+ s"\n create dictionary cache: $dictCacheTime" +
+ s"\n sort list, distinct and write: $dictWriteTime" +
+ s"\n write sort info: $sortIndexWriteTime")
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex)
+ throw ex
+ } finally {
+ if (!dictionaryForDistinctValueLookUpCleared) {
+ CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
+ }
+ CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
+ if (dictLock != null && isDictionaryLocked) {
+ if (dictLock.unlock()) {
+ logInfo(s"Dictionary ${
+ model.primDimensions(split.index).getColName
+ } Unlocked Successfully.")
+ } else {
+ logError(s"Unable to unlock Dictionary ${
+ model.primDimensions(split.index).getColName
+ }")
+ }
+ }
+ }
+ var finished = false
+
+ override def hasNext: Boolean = {
+
+ if (!finished) {
+ finished = true
+ finished
+ } else {
+ !finished
+ }
+ }
+
+ override def next(): (Int, String, Boolean) = {
+ (split.index, status, isHighCardinalityColumn)
+ }
+ }
+
+ iter
+ }
+
+}
+
+/**
+ * Set column dictionry patition format
+ *
+ * @param id partition id
+ * @param dimension current carbon dimension
+ */
+class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
+ extends Partition {
+ override val index: Int = id
+ val preDefDictDimension = dimension
+}
+
+
+/**
+ * Use external column dict to generate global dictionary
+ *
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext spark context
+ * @param table carbon table identifier
+ * @param dimensions carbon dimenisons having predefined dict
+ * @param hdfsLocation carbon base store path
+ * @param dictFolderPath path of dictionary folder
+ */
+class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
+ dictionaryLoadModel: DictionaryLoadModel,
+ sparkContext: SparkContext,
+ table: CarbonTableIdentifier,
+ dimensions: Array[CarbonDimension],
+ hdfsLocation: String,
+ dictFolderPath: String)
+ extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
+
+ override def getPartitions: Array[Partition] = {
+ val primDimensions = dictionaryLoadModel.primDimensions
+ val primDimLength = primDimensions.length
+ val result = new Array[Partition](primDimLength)
+ for (i <- 0 until primDimLength) {
+ result(i) = new CarbonColumnDictPatition(i, primDimensions(i))
+ }
+ result
+ }
+
+ override def compute(split: Partition, context: TaskContext)
+ : Iterator[(Int, ColumnDistinctValues)] = {
+ val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
+ val primDimension = theSplit.preDefDictDimension
+ // read the column dict data
+ val preDefDictFilePath = carbonLoadModel.getPredefDictFilePath(primDimension)
+ var csvReader: CSVReader = null
+ var inputStream: DataInputStream = null
+ var colDictData: java.util.Iterator[Array[String]] = null
+ try {
+ inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
+ FileFactory.getFileType(preDefDictFilePath))
+ csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset),
+ carbonLoadModel.getCsvDelimiter.charAt(0))
+ // read the column data to list iterator
+ colDictData = csvReader.readAll.iterator
+ } catch {
+ case ex: Exception =>
+ logError(s"Error in reading pre-defined " +
+ s"dictionary file:${ ex.getMessage }")
+ throw ex
+ } finally {
+ if (csvReader != null) {
+ try {
+ csvReader.close()
+ } catch {
+ case ex: Exception =>
+ logError(s"Error in closing csvReader of " +
+ s"pre-defined dictionary file:${ ex.getMessage }")
+ }
+ }
+ if (inputStream != null) {
+ try {
+ inputStream.close()
+ } catch {
+ case ex: Exception =>
+ logError(s"Error in closing inputStream of " +
+ s"pre-defined dictionary file:${ ex.getMessage }")
+ }
+ }
+ }
+ val mapIdWithSet = new HashMap[String, HashSet[String]]
+ val columnValues = new HashSet[String]
+ val distinctValues = (theSplit.index, columnValues)
+ mapIdWithSet.put(primDimension.getColumnId, columnValues)
+ // use parser to generate new dict value
+ val dimensionParser = GlobalDictionaryUtil.generateParserForDimension(
+ Some(primDimension),
+ GlobalDictionaryUtil.createDataFormat(carbonLoadModel.getDelimiters),
+ mapIdWithSet).get
+ // parse the column data
+ while (colDictData.hasNext) {
+ dimensionParser.parseString(colDictData.next()(0))
+ }
+ Array((distinctValues._1,
+ ColumnDistinctValues(distinctValues._2.toArray, 0L))).iterator
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
new file mode 100644
index 0000000..c9e3b6b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.{Collections, List}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.spark.MergeResult
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.spark.splits.TableSplit
+
+class CarbonMergerRDD[K, V](
+ sc: SparkContext,
+ result: MergeResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ carbonMergerMapping: CarbonMergerMapping,
+ confExecutorsTemp: String)
+ extends RDD[(K, V)](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+ sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+
+ var storeLocation: String = null
+ val storePath = carbonMergerMapping.storePath
+ val metadataFilePath = carbonMergerMapping.metadataFilePath
+ val mergedLoadName = carbonMergerMapping.mergedLoadName
+ val databaseName = carbonMergerMapping.databaseName
+ val factTableName = carbonMergerMapping.factTableName
+ val tableId = carbonMergerMapping.tableId
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ val tempLocationKey: String = CarbonCommonConstants
+ .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+ .getDatabaseName + '_' + carbonLoadModel
+ .getTableName + '_' + carbonLoadModel.getTaskNo
+
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.nonEmpty) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ } else {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+ var mergeStatus = false
+ var mergeNumber = ""
+ var exec: CarbonCompactionExecutor = null
+ try {
+ val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
+
+ // get destination segment properties as sent from driver which is of last segment.
+
+ val segmentProperties = new SegmentProperties(
+ carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
+ carbonMergerMapping.maxSegmentColCardinality)
+
+ // sorting the table block info List.
+ val splitList = carbonSparkPartition.split.value.getAllSplits
+ val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
+
+ Collections.sort(tableBlockInfoList)
+
+ val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+ CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+
+ val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
+ CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+
+ carbonLoadModel.setStorePath(storePath)
+
+ exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+ factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ dataFileMetadataSegMapping
+ )
+
+ // fire a query and get the results.
+ var result2: util.List[RawResultIterator] = null
+ try {
+ result2 = exec.processTableBlocks()
+ } catch {
+ case e: Throwable =>
+ if (null != exec) {
+ exec.finish()
+ }
+ LOGGER.error(e)
+ if (null != e.getMessage) {
+ sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
+ } else {
+ sys.error("Exception occurred in query execution.Please check logs.")
+ }
+ }
+ mergeNumber = mergedLoadName
+ .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+ CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+ )
+
+ val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+ factTableName,
+ carbonLoadModel.getTaskNo,
+ "0",
+ mergeNumber,
+ true
+ )
+
+ carbonLoadModel.setSegmentId(mergeNumber)
+ carbonLoadModel.setPartitionId("0")
+ val merger =
+ new RowResultMerger(result2,
+ databaseName,
+ factTableName,
+ segmentProperties,
+ tempStoreLoc,
+ carbonLoadModel,
+ carbonMergerMapping.maxSegmentColCardinality
+ )
+ mergeStatus = merger.mergerSlice()
+
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ throw e
+ } finally {
+ // delete temp location data
+ val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+ try {
+ val isCompactionFlow = true
+ CarbonLoaderUtil
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ }
+ if (null != exec) {
+ exec.finish
+ }
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ finished = true
+ finished
+ } else {
+ !finished
+ }
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(0, mergeStatus)
+ }
+
+ }
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonSparkPartition]
+ theSplit.split.value.getLocations.filter(_ != "localhost")
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val startTime = System.currentTimeMillis()
+ val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
+ storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+ )
+ val jobConf: JobConf = new JobConf(new Configuration)
+ val job: Job = new Job(jobConf)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+ var defaultParallelism = sparkContext.defaultParallelism
+ val result = new util.ArrayList[Partition](defaultParallelism)
+
+ // mapping of the node and block list.
+ var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
+ util.HashMap[String, util.List[Distributable]]
+
+ val noOfBlocks = 0
+ var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+
+ // for each valid segment.
+ for (eachSeg <- carbonMergerMapping.validSegments) {
+
+ // map for keeping the relation of a task and its blocks.
+ job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+
+ // get splits
+ val splits = format.getSplits(job)
+ carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ }
+
+ // prepare the details required to extract the segment properties using last segment.
+ if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
+ val carbonInputSplit = carbonInputSplits.last
+ var dataFileFooter: DataFileFooter = null
+
+ try {
+ dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
+ carbonInputSplit.getStart, carbonInputSplit.getLength)
+ } catch {
+ case e: CarbonUtilException =>
+ logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+ throw e
+ }
+
+ carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
+ .getColumnCardinality
+ carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
+ .toList
+ }
+ // send complete list of blocks to the mapping util.
+ nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
+ carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
+
+ val confExecutors = confExecutorsTemp.toInt
+ val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
+ confExecutors
+ } else { nodeBlockMapping.size() }
+ DistributionUtil.ensureExecutors(sparkContext, requiredExecutors)
+ logInfo("No.of Executors required=" + requiredExecutors +
+ " , spark.executor.instances=" + confExecutors +
+ ", no.of.nodes where data present=" + nodeBlockMapping.size())
+ var nodes = DistributionUtil.getNodeList(sparkContext)
+ var maxTimes = 30
+ while (nodes.length < requiredExecutors && maxTimes > 0) {
+ Thread.sleep(500)
+ nodes = DistributionUtil.getNodeList(sparkContext)
+ maxTimes = maxTimes - 1
+ }
+ logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
+ defaultParallelism = sparkContext.defaultParallelism
+ var i = 0
+
+ val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
+
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
+ val taskBlockList = new util.ArrayList[NodeInfo](0)
+ nodeTaskBlocksMap.put(nodeName, taskBlockList)
+ var blockletCount = 0
+ blockList.asScala.foreach { taskInfo =>
+ val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
+ blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
+ taskBlockList.add(
+ NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
+ }
+ if (blockletCount != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+ carbonInputSplits.asJava, nodeName)
+ result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
+ i += 1
+ }
+ }
+
+ // print the node info along with task and number of blocks for the task.
+
+ nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
+ logInfo(s"for the node ${ entry._1 }")
+ for (elem <- entry._2.asScala) {
+ logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
+ }
+ })
+
+ val noOfNodes = nodes.length
+ val noOfTasks = result.size
+ logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
+ s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
+ logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
+ for (j <- 0 until result.size ) {
+ val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+ val splitList = multiBlockSplit.getAllSplits
+ logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
+ s"${CarbonInputSplit.createBlocks(splitList).size}")
+ }
+ result.toArray(new Array[Partition](result.size))
+ }
+
+}
+
+class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
+ extends Partition {
+
+ override val index: Int = idx
+ val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
+
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
new file mode 100644
index 0000000..82a471f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.{Partition, SerializableWritable}
+
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+
+class CarbonSparkPartition(
+ val rddId: Int,
+ val idx: Int,
+ @transient val multiBlockSplit: CarbonMultiBlockSplit)
+ extends Partition {
+
+ val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
+ override val index: Int = idx
+
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
new file mode 100644
index 0000000..fe805fe
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+
+/**
+ * Compactor class which handled the compaction cases.
+ */
+object Compactor {
+
+ val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
+
+ def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+
+ val storePath = compactionCallableModel.storePath
+ val storeLocation = compactionCallableModel.storeLocation
+ val carbonTable = compactionCallableModel.carbonTable
+ val kettleHomePath = compactionCallableModel.kettleHomePath
+ val cubeCreationTime = compactionCallableModel.cubeCreationTime
+ val loadsToMerge = compactionCallableModel.loadsToMerge
+ val sc = compactionCallableModel.sqlContext
+ val carbonLoadModel = compactionCallableModel.carbonLoadModel
+ val compactionType = compactionCallableModel.compactionType
+
+ val startTime = System.nanoTime()
+ val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+ var finalMergeStatus = false
+ val schemaName: String = carbonLoadModel.getDatabaseName
+ val factTableName = carbonLoadModel.getTableName
+ val validSegments: Array[String] = CarbonDataMergerUtil
+ .getValidSegments(loadsToMerge).split(',')
+ val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
+ val carbonMergerMapping = CarbonMergerMapping(storeLocation,
+ storePath,
+ carbonTable.getMetaDataFilepath,
+ mergedLoadName,
+ kettleHomePath,
+ cubeCreationTime,
+ schemaName,
+ factTableName,
+ validSegments,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ maxSegmentColCardinality = null,
+ maxSegmentColumnSchemaList = null
+ )
+ carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
+ carbonLoadModel.setLoadMetadataDetails(
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+ var execInstance = "1"
+ // in case of non dynamic executor allocation, number of executors are fixed.
+ if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
+ execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
+ logger.info(s"spark.executor.instances property is set to = $execInstance")
+ } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
+ else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
+ if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+ .equalsIgnoreCase("true")) {
+ execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
+ }
+ }
+
+ val mergeStatus = new CarbonMergerRDD(
+ sc.sparkContext,
+ new MergeResultImpl(),
+ carbonLoadModel,
+ carbonMergerMapping,
+ execInstance
+ ).collect
+
+ if (mergeStatus.length == 0) {
+ finalMergeStatus = false
+ } else {
+ finalMergeStatus = mergeStatus.forall(_._2)
+ }
+
+ if (finalMergeStatus) {
+ val endTime = System.nanoTime()
+ logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
+ if (!CarbonDataMergerUtil
+ .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
+ mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
+ )) {
+ logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ throw new Exception(s"Compaction failed to update metadata for table" +
+ s" ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ } else {
+ logger.audit(s"Compaction request completed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }")
+ }
+ } else {
+ logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }"
+ )
+ logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+ s".${ carbonLoadModel.getTableName }")
+ throw new Exception("Compaction Failure in Merger Rdd.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
new file mode 100644
index 0000000..7395e43
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+
+case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
+
+class DataLoadCoalescedRDD[T: ClassTag](
+ @transient var prev: RDD[T],
+ nodeList: Array[String])
+ extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
+
+ override def getPartitions: Array[Partition] = {
+ new DataLoadPartitionCoalescer(prev, nodeList).run
+ }
+
+ override def compute(split: Partition,
+ context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+
+ new Iterator[DataLoadPartitionWrap[T]] {
+ val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
+ def hasNext = iter.hasNext
+ def next: DataLoadPartitionWrap[T] = {
+ DataLoadPartitionWrap(firstParent[T], iter.next())
+ }
+ }
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+ })
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
+
+ /**
+ * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+ * then the preferred machine will be one which most parent splits prefer too.
+ * @param partition
+ * @return the machine most preferred by split
+ */
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
+ }
+}