You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/12/05 09:36:36 UTC
[2/3] incubator-carbondata git commit: modify all RDD
modify all RDD
rebase
fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1a1b18d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1a1b18d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1a1b18d0
Branch: refs/heads/master
Commit: 1a1b18d0294908d94f1fa1ba2b02e8dabb38c5a5
Parents: e642ceb
Author: jackylk <ja...@huawei.com>
Authored: Fri Dec 2 23:54:49 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Dec 5 17:35:11 2016 +0800
----------------------------------------------------------------------
.../carbondata/hadoop/CarbonInputFormat.java | 4 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 234 +++++++++++
.../spark/rdd/DataManagementFunc.scala | 371 +++++++++++++++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 310 +++++++++++++++
.../carbondata/spark/rdd/SparkCommonEnv.scala | 30 ++
.../spark/sql/hive/DistributionUtil.scala | 12 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 349 +---------------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 255 ------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 276 -------------
.../apache/spark/mapred/SparkMapRedUtil.scala | 32 --
.../sql/CarbonDatasourceHadoopRelation.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 18 +-
.../scala/org/apache/spark/sql/CarbonScan.scala | 7 +-
.../execution/command/carbonTableSchema.scala | 9 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 397 ++-----------------
.../carbondata/spark/rdd/CarbonScanRDD.scala | 250 ------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 312 ---------------
.../sql/CarbonDatasourceHadoopRelation.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 19 +-
.../execution/command/carbonTableSchema.scala | 7 +-
20 files changed, 1024 insertions(+), 1872 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index e707c4e..66f0e3b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -166,8 +166,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return configuration.get(COLUMN_PROJECTION);
}
- public static void setCarbonReadSupport(Class<? extends CarbonReadSupport> readSupportClass,
- Configuration configuration) {
+ public static void setCarbonReadSupport(Configuration configuration,
+ Class<? extends CarbonReadSupport> readSupportClass) {
if (readSupportClass != null) {
configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
new file mode 100644
index 0000000..a750b10
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.DistributionUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+/**
+ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
+ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
+ * level filtering in driver side.
+ */
+class CarbonScanRDD[V: ClassTag](
+ @transient sc: SparkContext,
+ columnProjection: CarbonProjection,
+ filterExpression: Expression,
+ identifier: AbsoluteTableIdentifier,
+ @transient carbonTable: CarbonTable)
+ extends RDD[V](sc, Nil) {
+
+ private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ @transient private val jobId = new JobID(jobTrackerId, id)
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def getPartitions: Array[Partition] = {
+ val job = Job.getInstance(new Configuration())
+ val format = prepareInputFormatForDriver(job.getConfiguration)
+
+ // initialise query_id for job
+ job.getConfiguration.set("query.id", queryId)
+
+ // get splits
+ val splits = format.getSplits(job)
+ val result = distributeSplits(splits)
+ result
+ }
+
+ private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+ // this function distributes the split based on following logic:
+ // 1. based on data locality, to make split balanced on all available nodes
+ // 2. if the number of split for one
+
+ var statistic = new QueryStatistic()
+ val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val parallelism = sparkContext.defaultParallelism
+ val result = new util.ArrayList[Partition](parallelism)
+ var noOfBlocks = 0
+ var noOfNodes = 0
+ var noOfTasks = 0
+
+ if (!splits.isEmpty) {
+ // create a list of block based on split
+ val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+ // get the list of executors and map blocks to executors based on locality
+ val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+ // divide the blocks among the tasks of the nodes as per the data locality
+ val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+ parallelism, activeNodes.toList.asJava)
+
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statistic = new QueryStatistic()
+
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+ blockList.asScala.foreach { blocksPerTask =>
+ val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ if (blocksPerTask.size() != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+ val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+ result.add(partition)
+ i += 1
+ }
+ }
+ }
+
+ noOfBlocks = splits.size
+ noOfNodes = nodeBlockMapping.size
+ noOfTasks = result.size()
+
+ statistic = new QueryStatistic()
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+ System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statisticRecorder.logStatisticsAsTableDriver()
+ }
+ logInfo(
+ s"""
+ | Identified no.of.blocks: $noOfBlocks,
+ | no.of.tasks: $noOfTasks,
+ | no.of.nodes: $noOfNodes,
+ | parallelism: $parallelism
+ """.stripMargin)
+ result.toArray(new Array[Partition](result.size()))
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+ )
+ }
+
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+ val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+ val reader = format.createRecordReader(inputSplit, attemptContext)
+ reader.initialize(inputSplit, attemptContext)
+
+ val queryStartTime = System.currentTimeMillis
+
+ new Iterator[V] {
+ private var havePair = false
+ private var finished = false
+ private var count = 0
+
+ context.addTaskCompletionListener { context =>
+ logStatistics(queryStartTime, count)
+ reader.close()
+ }
+
+ override def hasNext: Boolean = {
+ if (context.isInterrupted) {
+ throw new TaskKilledException
+ }
+ if (!finished && !havePair) {
+ finished = !reader.nextKeyValue
+ if (finished) {
+ reader.close()
+ }
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ val value: V = reader.getCurrentValue
+ count += 1
+ value
+ }
+ }
+ }
+
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonTable(conf, carbonTable)
+ createInputFormat(conf)
+ }
+
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonReadSupport(conf, SparkCommonEnv.readSupportClass)
+ createInputFormat(conf)
+ }
+
+ private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+ val format = new CarbonInputFormat[V]
+ CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+ CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ format
+ }
+
+ def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+ var queryStatistic = new QueryStatistic()
+ queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+ System.currentTimeMillis - queryStartTime)
+ val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // result size
+ queryStatistic = new QueryStatistic()
+ queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // print executor query statistics for each task_id
+ statisticRecorder.logStatisticsAsTableExecutor()
+ }
+
+ /**
+ * Get the preferred locations where to launch this task.
+ */
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonSparkPartition]
+ val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+ firstOptionLocation
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
new file mode 100644
index 0000000..28a9140
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.concurrent._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
+
+/**
+ * Common functions for data life cycle management
+ */
+object DataManagementFunc {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def deleteLoadByDate(
+ sqlContext: SQLContext,
+ schema: CarbonDataLoadSchema,
+ databaseName: String,
+ tableName: String,
+ storePath: String,
+ dateField: String,
+ dateFieldActualName: String,
+ dateValue: String) {
+
+ val sc = sqlContext
+ // Delete the records based on data
+ val table = CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)
+ val loadMetadataDetailsArray =
+ SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
+ val resultMap = new CarbonDeleteLoadByDateRDD(
+ sc.sparkContext,
+ new DeletedLoadResultImpl(),
+ databaseName,
+ table.getDatabaseName,
+ dateField,
+ dateFieldActualName,
+ dateValue,
+ table.getFactTableName,
+ tableName,
+ storePath,
+ loadMetadataDetailsArray).collect.groupBy(_._1)
+
+ var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
+ if (resultMap.nonEmpty) {
+ if (resultMap.size == 1) {
+ if (resultMap.contains("")) {
+ LOGGER.error("Delete by Date request is failed")
+ sys.error("Delete by Date request is failed, potential causes " +
+ "Empty store or Invalid column type, For more details please refer logs.")
+ }
+ }
+ val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
+ var statusList = resultMap.get(elem.getLoadName)
+ // check for the merged load folder.
+ if (statusList.isEmpty && null != elem.getMergedLoadName) {
+ statusList = resultMap.get(elem.getMergedLoadName)
+ }
+
+ if (statusList.isDefined) {
+ elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
+ // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
+ // use MARKED_FOR_UPDATE
+ if (statusList.get
+ .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
+ elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+ } else {
+ elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
+ updatedLoadMetadataDetailsList += elem
+ }
+ elem
+ } else {
+ elem
+ }
+ }
+
+ }
+
+ // Save the load metadata
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK
+ )
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Successfully got the table metadata file lock")
+ if (updatedLoadMetadataDetailsList.nonEmpty) {
+ // TODO: Load Aggregate tables after retention.
+ }
+
+ // write
+ CarbonLoaderUtil.writeLoadMetadata(
+ schema,
+ databaseName,
+ table.getDatabaseName,
+ updatedloadMetadataDetails.asJava
+ )
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("unlock the table metadata file successfully")
+ } else {
+ LOGGER.error("Unable to unlock the metadata lock")
+ }
+ }
+ } else {
+ LOGGER.error("Delete by Date request is failed")
+ LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
+ sys.error("Delete by Date request is failed, potential causes " +
+ "Empty store or Invalid column type, For more details please refer logs.")
+ }
+ }
+
+ def executeCompaction(carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ compactionModel: CompactionModel,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ kettleHomePath: String,
+ storeLocation: String): Unit = {
+ val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+ carbonLoadModel.getLoadMetadataDetails
+ )
+ CarbonDataMergerUtil.sortSegments(sortedSegments)
+
+ var segList = carbonLoadModel.getLoadMetadataDetails
+ var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ storePath,
+ carbonLoadModel,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
+ while (loadsToMerge.size() > 1) {
+ val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
+ deletePartialLoadsInCompaction(carbonLoadModel)
+ val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+ CarbonCommonConstants
+ .DEFAULT_COLLECTION_SIZE
+ )
+
+ scanSegmentsAndSubmitJob(futureList,
+ loadsToMerge,
+ executor,
+ storePath,
+ sqlContext,
+ compactionModel,
+ kettleHomePath,
+ carbonLoadModel,
+ storeLocation
+ )
+
+ try {
+
+ futureList.asScala.foreach(future => {
+ future.get
+ }
+ )
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
+ throw e
+ }
+
+
+ // scan again and determine if anything is there to merge again.
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ segList = carbonLoadModel.getLoadMetadataDetails
+ // in case of major compaction we will scan only once and come out as it will keep
+ // on doing major for the new loads also.
+ // excluding the newly added segments.
+ if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+
+ segList = CarbonDataMergerUtil
+ .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+ }
+ loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ storePath,
+ carbonLoadModel,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
+ }
+ }
+
+ /**
+ * This will submit the loads to be merged into the executor.
+ *
+ * @param futureList
+ */
+ def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
+ loadsToMerge: util
+ .List[LoadMetadataDetails],
+ executor: ExecutorService,
+ storePath: String,
+ sqlContext: SQLContext,
+ compactionModel: CompactionModel,
+ kettleHomePath: String,
+ carbonLoadModel: CarbonLoadModel,
+ storeLocation: String): Unit = {
+
+ loadsToMerge.asScala.foreach(seg => {
+ LOGGER.info("loads identified for merge is " + seg.getLoadName)
+ }
+ )
+
+ val compactionCallableModel = CompactionCallableModel(storePath,
+ carbonLoadModel,
+ storeLocation,
+ compactionModel.carbonTable,
+ kettleHomePath,
+ compactionModel.tableCreationTime,
+ loadsToMerge,
+ sqlContext,
+ compactionModel.compactionType
+ )
+
+ val future: Future[Void] = executor
+ .submit(new CompactionCallable(compactionCallableModel
+ )
+ )
+ futureList.add(future)
+ }
+
+ def prepareCarbonLoadModel(storePath: String,
+ table: CarbonTable,
+ newCarbonLoadModel: CarbonLoadModel): Unit = {
+ newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+ newCarbonLoadModel.setTableName(table.getFactTableName)
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
+ newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
+ newCarbonLoadModel.setStorePath(table.getStorePath)
+ CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
+ val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+ newCarbonLoadModel.setFactTimeStamp(loadStartTime)
+ }
+
+ def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
+ // Deleting the any partially loaded data if present.
+ // in some case the segment folder which is present in store will not have entry in
+ // status.
+ // so deleting those folders.
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
+ }
+ }
+
+ def deleteLoadsAndUpdateMetadata(
+ carbonLoadModel: CarbonLoadModel,
+ table: CarbonTable,
+ storePath: String,
+ isForceDeletion: Boolean): Unit = {
+ if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
+ val loadMetadataFilePath = CarbonLoaderUtil
+ .extractLoadMetadataFileLocation(carbonLoadModel)
+ val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+ val carbonTableStatusLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.TABLE_STATUS_LOCK)
+
+ // Delete marked loads
+ val isUpdationRequired = DeleteLoadFolders
+ .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
+
+ if (isUpdationRequired) {
+ try {
+ // Update load metadate file after cleaning deleted nodes
+ if (carbonTableStatusLock.lockWithRetries()) {
+ LOGGER.info("Table status lock has been successfully acquired.")
+
+ // read latest table status again.
+ val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+
+ // update the metadata details from old to new status.
+ val latestStatus = CarbonLoaderUtil
+ .updateLoadMetadataFromOldToNew(details, latestMetadata)
+
+ CarbonLoaderUtil.writeLoadMetadata(
+ carbonLoadModel.getCarbonDataLoadSchema,
+ carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, latestStatus)
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }" +
+ ". Not able to acquire the table status lock due to other operation " +
+ "running in the background."
+ LOGGER.audit(errorMsg)
+ LOGGER.error(errorMsg)
+ throw new Exception(errorMsg + " Please try after some time.")
+ }
+ } finally {
+ CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+ }
+ }
+ }
+ }
+
+ def cleanFiles(
+ sc: SparkContext,
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String) {
+ val table = CarbonMetadata.getInstance.getCarbonTable(
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
+ val carbonCleanFilesLock = CarbonLockFactory.getCarbonLockObj(
+ table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK)
+ try {
+ if (carbonCleanFilesLock.lockWithRetries()) {
+ LOGGER.info("Clean files lock has been successfully acquired.")
+ deleteLoadsAndUpdateMetadata(carbonLoadModel,
+ table,
+ storePath,
+ isForceDeletion = true)
+ } else {
+ val errorMsg = "Clean files request is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ ". Not able to acquire the clean files lock due to another clean files " +
+ "operation is running in the background."
+ LOGGER.audit(errorMsg)
+ LOGGER.error(errorMsg)
+ throw new Exception(errorMsg + " Please try after some time.")
+
+ }
+ } finally {
+ CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
new file mode 100644
index 0000000..32770f7
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.common.CarbonIterator
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+
+ @transient
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ private def writeObject(out: ObjectOutputStream): Unit =
+ try {
+ out.defaultWriteObject()
+ value.write(out)
+ } catch {
+ case e: IOException =>
+ LOGGER.error(e, "Exception encountered")
+ throw e
+ case NonFatal(e) =>
+ LOGGER.error(e, "Exception encountered")
+ throw new IOException(e)
+ }
+
+
+ private def readObject(in: ObjectInputStream): Unit =
+ try {
+ value = new Configuration(false)
+ value.readFields(in)
+ } catch {
+ case e: IOException =>
+ LOGGER.error(e, "Exception encountered")
+ throw e
+ case NonFatal(e) =>
+ LOGGER.error(e, "Exception encountered")
+ throw new IOException(e)
+ }
+}
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ loadCount: Integer,
+ blocksGroupBy: Array[(String, Array[BlockDetails])],
+ isTableSplitPartition: Boolean)
+ extends RDD[(K, V)](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+ private val confBroadcast =
+ sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
+
+ override def getPartitions: Array[Partition] = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits: Array[TableSplit] = null
+
+ if (carbonLoadModel.isDirectLoad) {
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+ } else {
+ splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null)
+ }
+
+ splits.zipWithIndex.map { s =>
+ // filter the same partition unique id, because only one will match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+ p._1 == s._1.getPartition.getUniqueID)(0)._2
+ new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { b =>
+ new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+ }
+ }
+ }
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD should not be checkpointed.
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var model: CarbonLoadModel = _
+ var uniqueLoadStatusId =
+ carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+ try {
+ loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ val recordReaders = getInputIterators
+ val loader = new SparkPartitionLoader(model,
+ theSplit.index,
+ null,
+ null,
+ loadCount,
+ loadMetadataDetails)
+ // Intialize to set carbon properties
+ loader.initialize()
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ new DataLoadExecutor().execute(model,
+ loader.storeLocation,
+ recordReaders)
+ } catch {
+ case e: BadRecordFoundException =>
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ logInfo("Bad Record Found")
+ case e: Exception =>
+ logInfo("DataLoad failure", e)
+ LOGGER.error(e)
+ throw e
+ }
+
+ def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
+ var configuration: Configuration = confBroadcast.value.value
+ if (configuration == null) {
+ configuration = new Configuration()
+ }
+ configureCSVInputFormat(configuration)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
+ val format = new CSVInputFormat
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+ StandardLogService.setThreadName(partitionID, null)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ val readers =
+ split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader,
+ split.partitionBlocksDetail(index),
+ hadoopAttemptContext)
+ }
+ } else {
+ // for node partition
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(partitionID)
+ }
+ StandardLogService.setThreadName(blocksID, null)
+ val readers =
+ split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
+ }
+ }
+ }
+
+ def configureCSVInputFormat(configuration: Configuration): Unit = {
+ CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+ CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+ CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+ CSVInputFormat.setHeaderExtractionEnabled(configuration,
+ carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+ CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+ CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+ }
+
+ /**
+ * generate blocks id
+ *
+ * @return
+ */
+ def gernerateBlocksID: String = {
+ if (isTableSplitPartition) {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+ .getPartition.getUniqueID + "_" + UUID.randomUUID()
+ } else {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
+ }
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ isTableSplitPartition match {
+ case true =>
+ // for table split partition
+ val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+ val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ location
+ case false =>
+ // for node partition
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split : " + firstOptionLocation.head)
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach { tableBlock =>
+ tableBlock.getLocations.foreach { location =>
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
+ }
+ }
+ }
+ }
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
new file mode 100644
index 0000000..bf614b1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+// Used to solve cyclic-dependency issue of carbon-spark-common and carbon-spark, carbon-spark2
+// modules, variables or functions that different in carbon-spark and carbon-spark2 are set here
+object SparkCommonEnv {
+
+ var readSupportClass: Class[_ <: CarbonReadSupport[_]] = _
+
+ var numExistingExecutors: Int = _
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 8b1a2bb..5b9353e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.SparkCommonEnv
object DistributionUtil {
@transient
@@ -215,9 +216,6 @@ object DistributionUtil {
nodes.distinct.toSeq
}
- // Hack for spark2 integration
- var numExistingExecutors: Int = _
-
/**
* Requesting the extra executors other than the existing ones.
*
@@ -233,13 +231,11 @@ object DistributionUtil {
hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean = {
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- LOGGER
- .info(
+ LOGGER.info(
s"number of required executors are = $requiredExecutors and existing executors are = " +
- s"$numExistingExecutors")
+ s"${SparkCommonEnv.numExistingExecutors}")
if (requiredExecutors > 0) {
- LOGGER
- .info(s"Requesting total executors: $requiredExecutors")
+ LOGGER.info(s"Requesting total executors: $requiredExecutors")
b.requestTotalExecutors(requiredExecutors, localityAwareTasks, hostToLocalTaskCount)
}
true
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e5eb78a..8463477 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
@@ -64,105 +65,6 @@ object CarbonDataRDDFactory {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def deleteLoadByDate(
- sqlContext: SQLContext,
- schema: CarbonDataLoadSchema,
- databaseName: String,
- tableName: String,
- storePath: String,
- dateField: String,
- dateFieldActualName: String,
- dateValue: String) {
-
- val sc = sqlContext
- // Delete the records based on data
- val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .getCarbonTable(databaseName + "_" + tableName)
- val loadMetadataDetailsArray =
- SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
- val resultMap = new CarbonDeleteLoadByDateRDD(
- sc.sparkContext,
- new DeletedLoadResultImpl(),
- databaseName,
- table.getDatabaseName,
- dateField,
- dateFieldActualName,
- dateValue,
- table.getFactTableName,
- tableName,
- storePath,
- loadMetadataDetailsArray).collect.groupBy(_._1)
-
- var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
- if (resultMap.nonEmpty) {
- if (resultMap.size == 1) {
- if (resultMap.contains("")) {
- LOGGER.error("Delete by Date request is failed")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
- val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
- var statusList = resultMap.get(elem.getLoadName)
- // check for the merged load folder.
- if (statusList.isEmpty && null != elem.getMergedLoadName) {
- statusList = resultMap.get(elem.getMergedLoadName)
- }
-
- if (statusList.isDefined) {
- elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
- // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
- // use MARKED_FOR_UPDATE
- if (statusList.get
- .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
- } else {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
- updatedLoadMetadataDetailsList += elem
- }
- elem
- } else {
- elem
- }
- }
-
- }
-
- // Save the load metadata
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK
- )
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully got the table metadata file lock")
- if (updatedLoadMetadataDetailsList.nonEmpty) {
- // TODO: Load Aggregate tables after retention.
- }
-
- // write
- CarbonLoaderUtil.writeLoadMetadata(
- schema,
- databaseName,
- table.getDatabaseName,
- updatedloadMetadataDetails.asJava
- )
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("unlock the table metadata file successfully")
- } else {
- LOGGER.error("Unable to unlock the metadata lock")
- }
- }
- } else {
- LOGGER.error("Delete by Date request is failed")
- LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
-
def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
@@ -306,118 +208,6 @@ object CarbonDataRDDFactory {
}
}
- def executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
- compactionModel: CompactionModel,
- executor: ExecutorService,
- sqlContext: SQLContext,
- kettleHomePath: String,
- storeLocation: String): Unit = {
- val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
- carbonLoadModel.getLoadMetadataDetails
- )
- CarbonDataMergerUtil.sortSegments(sortedSegments)
-
- var segList = carbonLoadModel.getLoadMetadataDetails
- var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- storePath,
- carbonLoadModel,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- while (loadsToMerge.size() > 1) {
- val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
- deletePartialLoadsInCompaction(carbonLoadModel)
- val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
- CarbonCommonConstants
- .DEFAULT_COLLECTION_SIZE
- )
-
- scanSegmentsAndSubmitJob(futureList,
- loadsToMerge,
- executor,
- storePath,
- sqlContext,
- compactionModel,
- kettleHomePath,
- carbonLoadModel,
- storeLocation
- )
-
- try {
-
- futureList.asScala.foreach(future => {
- future.get
- }
- )
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
- throw e
- }
-
-
- // scan again and determine if anything is there to merge again.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
- segList = carbonLoadModel.getLoadMetadataDetails
- // in case of major compaction we will scan only once and come out as it will keep
- // on doing major for the new loads also.
- // excluding the newly added segments.
- if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
-
- segList = CarbonDataMergerUtil
- .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
- }
- loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- storePath,
- carbonLoadModel,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- }
- }
-
- /**
- * This will submit the loads to be merged into the executor.
- *
- * @param futureList
- */
- def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
- loadsToMerge: util
- .List[LoadMetadataDetails],
- executor: ExecutorService,
- storePath: String,
- sqlContext: SQLContext,
- compactionModel: CompactionModel,
- kettleHomePath: String,
- carbonLoadModel: CarbonLoadModel,
- storeLocation: String): Unit = {
-
- loadsToMerge.asScala.foreach(seg => {
- LOGGER.info("loads identified for merge is " + seg.getLoadName)
- }
- )
-
- val compactionCallableModel = CompactionCallableModel(storePath,
- carbonLoadModel,
- storeLocation,
- compactionModel.carbonTable,
- kettleHomePath,
- compactionModel.tableCreationTime,
- loadsToMerge,
- sqlContext,
- compactionModel.compactionType
- )
-
- val future: Future[Void] = executor
- .submit(new CompactionCallable(compactionCallableModel
- )
- )
- futureList.add(future)
- }
-
def startCompactionThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
@@ -447,7 +237,7 @@ object CarbonDataRDDFactory {
var triggeredCompactionStatus = false
var exception: Exception = null
try {
- executeCompaction(carbonLoadModel: CarbonLoadModel,
+ DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
storePath: String,
compactionModel: CompactionModel,
executor, sqlContext, kettleHomePath, storeLocation
@@ -479,7 +269,7 @@ object CarbonDataRDDFactory {
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = new CarbonLoadModel()
- prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+ DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
val tableCreationTime = CarbonEnv.get.carbonMetastore
.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
newCarbonLoadModel.getTableName
@@ -496,7 +286,7 @@ object CarbonDataRDDFactory {
)
// proceed for compaction
try {
- executeCompaction(newCarbonLoadModel,
+ DataManagementFunc.executeCompaction(newCarbonLoadModel,
newCarbonLoadModel.getStorePath,
newcompactionModel,
executor, sqlContext, kettleHomePath, storeLocation
@@ -534,7 +324,7 @@ object CarbonDataRDDFactory {
}
} finally {
executor.shutdownNow()
- deletePartialLoadsInCompaction(carbonLoadModel)
+ DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
compactionLock.unlock()
}
}
@@ -544,41 +334,11 @@ object CarbonDataRDDFactory {
compactionThread.run()
}
- def prepareCarbonLoadModel(storePath: String,
- table: CarbonTable,
- newCarbonLoadModel: CarbonLoadModel): Unit = {
- newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
- newCarbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
- newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
- newCarbonLoadModel.setStorePath(table.getStorePath)
- CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
- val loadStartTime = CarbonLoaderUtil.readCurrentTime()
- newCarbonLoadModel.setFactTimeStamp(loadStartTime)
- }
-
- def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
- // Deleting the any partially loaded data if present.
- // in some case the segment folder which is present in store will not have entry in
- // status.
- // so deleting those folders.
- try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
- s" ${ e.getMessage }")
- }
- }
-
def loadCarbonData(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
kettleHomePath: String,
- columinar: Boolean,
+ columnar: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
useKettle: Boolean,
dataFrame: Option[DataFrame] = None): Unit = {
@@ -673,7 +433,7 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
- deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
+ DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
isForceDeletion = false)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -800,7 +560,6 @@ object CarbonDataRDDFactory {
}
val jobContext = new Job(hadoopConfiguration)
val rawSplits = inputFormat.getSplits(jobContext).toArray
- val result = new Array[Partition](rawSplits.size)
val blockList = rawSplits.map { inputSplit =>
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
@@ -855,7 +614,7 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storePath,
kettleHomePath,
- columinar,
+ columnar,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
@@ -887,7 +646,7 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storePath,
kettleHomePath,
- columinar,
+ columnar,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
@@ -1003,94 +762,4 @@ object CarbonDataRDDFactory {
}
- def deleteLoadsAndUpdateMetadata(
- carbonLoadModel: CarbonLoadModel,
- table: CarbonTable,
- storePath: String,
- isForceDeletion: Boolean): Unit = {
- if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
- val loadMetadataFilePath = CarbonLoaderUtil
- .extractLoadMetadataFileLocation(carbonLoadModel)
- val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
- val carbonTableStatusLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.TABLE_STATUS_LOCK)
-
- // Delete marked loads
- val isUpdationRequired = DeleteLoadFolders
- .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
-
- if (isUpdationRequired) {
- try {
- // Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
- LOGGER.info("Table status lock has been successfully acquired.")
-
- // read latest table status again.
- val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
-
- // update the metadata details from old to new status.
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(
- carbonLoadModel.getCarbonDataLoadSchema,
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, latestStatus)
- } else {
- val errorMsg = "Clean files request is failed for " +
- s"${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }" +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
- }
- } finally {
- CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
- }
- }
- }
- }
-
- def dropTable(
- sc: SparkContext,
- schema: String,
- table: String) {
- val v: Value[Array[Object]] = new ValueImpl()
- new CarbonDropTableRDD(sc, v, schema, table).collect
- }
-
- def cleanFiles(
- sc: SparkContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String) {
- val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
- val carbonCleanFilesLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.CLEAN_FILES_LOCK
- )
- try {
- if (carbonCleanFilesLock.lockWithRetries()) {
- LOGGER.info("Clean files lock has been successfully acquired.")
- deleteLoadsAndUpdateMetadata(carbonLoadModel,
- table,
- storePath,
- isForceDeletion = true)
- } else {
- val errorMsg = "Clean files request is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- ". Not able to acquire the clean files lock due to another clean files " +
- "operation is running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
-
- }
- } finally {
- CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
deleted file mode 100644
index cae99d1..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.Date
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
-import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.hive.DistributionUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.datastore.block.Distributable
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
-import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
-class CarbonSparkPartition(
- val rddId: Int,
- val idx: Int,
- @transient val multiBlockSplit: CarbonMultiBlockSplit)
- extends Partition {
-
- val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
-
- override val index: Int = idx
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
- * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
- * level filtering in driver side.
- */
-class CarbonScanRDD[V: ClassTag](
- @transient sc: SparkContext,
- columnProjection: Seq[Attribute],
- filterExpression: Expression,
- identifier: AbsoluteTableIdentifier,
- @transient carbonTable: CarbonTable)
- extends RDD[V](sc, Nil)
- with CarbonHadoopMapReduceUtil
- with Logging {
-
- private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
-
- @transient private val jobId = new JobID(jobTrackerId, id)
- @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
- override def getPartitions: Array[Partition] = {
- val job = Job.getInstance(new Configuration())
- val format = prepareInputFormatForDriver(job.getConfiguration)
-
- // initialise query_id for job
- job.getConfiguration.set("query.id", queryId)
-
- // get splits
- val splits = format.getSplits(job)
- val result = distributeSplits(splits)
- result
- }
-
- private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
- // this function distributes the split based on following logic:
- // 1. based on data locality, to make split balanced on all available nodes
- // 2. if the number of split for one
-
- var statistic = new QueryStatistic()
- val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val parallelism = sparkContext.defaultParallelism
- val result = new util.ArrayList[Partition](parallelism)
- var noOfBlocks = 0
- var noOfNodes = 0
- var noOfTasks = 0
-
- if (!splits.isEmpty) {
- // create a list of block based on split
- val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
-
- // get the list of executors and map blocks to executors based on locality
- val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
-
- // divide the blocks among the tasks of the nodes as per the data locality
- val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
- parallelism, activeNodes.toList.asJava)
-
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryId)
- statistic = new QueryStatistic()
-
- var i = 0
- // Create Spark Partition for each task and assign blocks
- nodeBlockMapping.asScala.foreach { case (node, blockList) =>
- blockList.asScala.foreach { blocksPerTask =>
- val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
- if (blocksPerTask.size() != 0) {
- val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
- val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
- result.add(partition)
- i += 1
- }
- }
- }
-
- noOfBlocks = splits.size
- noOfNodes = nodeBlockMapping.size
- noOfTasks = result.size()
-
- statistic = new QueryStatistic()
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
- System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryId)
- statisticRecorder.logStatisticsAsTableDriver()
- }
- logInfo(
- s"""
- | Identified no.of.blocks: $noOfBlocks,
- | no.of.tasks: $noOfTasks,
- | no.of.nodes: $noOfNodes,
- | parallelism: $parallelism
- """.stripMargin)
- result.toArray(new Array[Partition](result.size()))
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[V] = {
- val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- if (null == carbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
- )
- }
-
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
- val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
- val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
- val reader = format.createRecordReader(inputSplit, attemptContext)
- reader.initialize(inputSplit, attemptContext)
-
- val queryStartTime = System.currentTimeMillis
-
- new Iterator[V] {
- private var havePair = false
- private var finished = false
- private var count = 0
-
- context.addTaskCompletionListener { context =>
- logStatistics(queryStartTime, count)
- reader.close()
- }
-
- override def hasNext: Boolean = {
- if (context.isInterrupted) {
- throw new TaskKilledException
- }
- if (!finished && !havePair) {
- finished = !reader.nextKeyValue
- if (finished) {
- reader.close()
- }
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- val value: V = reader.getCurrentValue
- count += 1
- value
- }
- }
- }
-
- private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
- CarbonInputFormat.setCarbonTable(conf, carbonTable)
- createInputFormat(conf)
- }
-
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
- CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
- createInputFormat(conf)
- }
-
- private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
- val format = new CarbonInputFormat[V]
- CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
- val projection = new CarbonProjection
- columnProjection.foreach { attr =>
- projection.addColumn(attr.name)
- }
- CarbonInputFormat.setColumnProjection(conf, projection)
- format
- }
-
- def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
- var queryStatistic = new QueryStatistic()
- queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
- System.currentTimeMillis - queryStartTime)
- val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
- statisticRecorder.recordStatistics(queryStatistic)
- // result size
- queryStatistic = new QueryStatistic()
- queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
- statisticRecorder.recordStatistics(queryStatistic)
- // print executor query statistics for each task_id
- statisticRecorder.logStatisticsAsTableExecutor()
- }
-
- /**
- * Get the preferred locations where to launch this task.
- */
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonSparkPartition]
- val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
- firstOptionLocation
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
deleted file mode 100644
index 44a2416..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.common.CarbonIterator
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
-import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-/**
- * It loads the data to carbon using @AbstractDataLoadProcessorStep
- */
-class NewCarbonDataLoadRDD[K, V](
- sc: SparkContext,
- result: DataLoadResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- loadCount: Integer,
- blocksGroupBy: Array[(String, Array[BlockDetails])],
- isTableSplitPartition: Boolean)
- extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
-
- // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast =
- sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
-
- override def getPartitions: Array[Partition] = {
- if (isTableSplitPartition) {
- // for table split partition
- var splits: Array[TableSplit] = null
-
- if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- } else {
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null)
- }
-
- splits.zipWithIndex.map { s =>
- // filter the same partition unique id, because only one will match, so get 0 element
- val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
- p._1 == s._1.getPartition.getUniqueID)(0)._2
- new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
- }
- } else {
- // for node partition
- blocksGroupBy.zipWithIndex.map { b =>
- new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
- }
- }
- }
-
- override def checkpoint() {
- // Do nothing. Hadoop RDD should not be checkpointed.
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[(K, V)] {
- var partitionID = "0"
- val loadMetadataDetails = new LoadMetadataDetails()
- var model: CarbonLoadModel = _
- var uniqueLoadStatusId =
- carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
- try {
- loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-
- carbonLoadModel.setSegmentId(String.valueOf(loadCount))
- val recordReaders = getInputIterators
- val loader = new SparkPartitionLoader(model,
- theSplit.index,
- null,
- null,
- loadCount,
- loadMetadataDetails)
- // Intialize to set carbon properties
- loader.initialize()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- new DataLoadExecutor().execute(model,
- loader.storeLocation,
- recordReaders)
- } catch {
- case e: BadRecordFoundException =>
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- logInfo("Bad Record Found")
- case e: Exception =>
- logInfo("DataLoad failure", e)
- LOGGER.error(e)
- throw e
- }
-
- def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0)
- var configuration: Configuration = confBroadcast.value.value
- // Broadcast fails in some cases WTF??
- if (configuration == null) {
- configuration = new Configuration()
- }
- configureCSVInputFormat(configuration)
- val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
- val format = new CSVInputFormat
- if (isTableSplitPartition) {
- // for table split partition
- val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID)
- }
- partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
- StandardLogService.setThreadName(partitionID, null)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
- partitionID, split.partitionBlocksDetail.length)
- val readers =
- split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.map { case (reader, index) =>
- new RecordReaderIterator(reader,
- split.partitionBlocksDetail(index),
- hadoopAttemptContext)
- }
- } else {
- // for node partition
- val split = theSplit.asInstanceOf[CarbonNodePartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
- split.serializableHadoopSplit, split.nodeBlocksDetail.length)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(partitionID)
- }
- StandardLogService.setThreadName(blocksID, null)
- val readers =
- split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.map { case (reader, index) =>
- new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
- }
- }
- }
-
- def configureCSVInputFormat(configuration: Configuration): Unit = {
- CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
- CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
- CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
- CSVInputFormat.setHeaderExtractionEnabled(configuration,
- carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
- CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
- CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
- CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
- }
-
- /**
- * generate blocks id
- *
- * @return
- */
- def gernerateBlocksID: String = {
- if (isTableSplitPartition) {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
- .getPartition.getUniqueID + "_" + UUID.randomUUID()
- } else {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- UUID.randomUUID()
- }
- }
-
- var finished = false
-
- override def hasNext: Boolean = {
- !finished
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(uniqueLoadStatusId, loadMetadataDetails)
- }
- }
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- isTableSplitPartition match {
- case true =>
- // for table split partition
- val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
- val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
- location
- case false =>
- // for node partition
- val theSplit = split.asInstanceOf[CarbonNodePartition]
- val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Preferred Location for split : " + firstOptionLocation.head)
- val blockMap = new util.LinkedHashMap[String, Integer]()
- val tableBlocks = theSplit.blocksDetails
- tableBlocks.foreach { tableBlock =>
- tableBlock.getLocations.foreach { location =>
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
- }
- }
- }
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
- nodeCount1.getValue > nodeCount2.getValue
- }
-
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1a1b18d0/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
deleted file mode 100644
index 84f398a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapred
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * It is just dummy class to use Sparks package restricted SparkHadoopMapReduceUtil.
- */
-trait CarbonHadoopMapReduceUtil extends SparkHadoopMapReduceUtil {
-
-}
-
-class CarbonSerializableConfiguration(@transient var conf: Configuration)
- extends SerializableConfiguration(conf)