You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/31 07:00:05 UTC
[07/22] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
deleted file mode 100644
index 6eeeaf9..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ /dev/null
@@ -1,1088 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent._
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
-import scala.util.control.Breaks._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
-import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.SparkUtil
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions
-import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
-import org.apache.carbondata.spark._
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
-
-/**
- * This is the factory class which can create different RDD depends on user needs.
- *
- */
-object CarbonDataRDDFactory {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def alterTableForCompaction(sqlContext: SQLContext,
- alterTableModel: AlterTableModel,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- storeLocation: String): Unit = {
- var compactionSize: Long = 0
- var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
- if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
- compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
- compactionType = CompactionType.MAJOR_COMPACTION
- } else if (alterTableModel.compactionType
- .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
- compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- if (alterTableModel.segmentUpdateStatusManager.get != None) {
- carbonLoadModel
- .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get))
- carbonLoadModel
- .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
- .getLoadMetadataDetails.toList.asJava)
- }
- }
- else {
- compactionType = CompactionType.MINOR_COMPACTION
- }
-
- LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
- // reading the start time of data load.
- val loadStartTime = CarbonUpdateUtil.readCurrentTime();
- carbonLoadModel.setFactTimeStamp(loadStartTime)
-
- val isCompactionTriggerByDDl = true
- val compactionModel = CompactionModel(compactionSize,
- compactionType,
- carbonTable,
- isCompactionTriggerByDDl
- )
-
- val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
-
- // if system level compaction is enabled then only one compaction can run in the system
- // if any other request comes at this time then it will create a compaction request file.
- // so that this will be taken up by the compaction process which is executing.
- if (!isConcurrentCompactionAllowed) {
- LOGGER.info("System level compaction lock is enabled.")
- handleCompactionForSystemLocking(sqlContext,
- carbonLoadModel,
- storePath,
- storeLocation,
- compactionType,
- carbonTable,
- compactionModel
- )
- } else {
- // normal flow of compaction
- val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
-
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- try {
- startCompactionThreads(sqlContext,
- carbonLoadModel,
- storePath,
- storeLocation,
- compactionModel,
- lock
- )
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
- lock.unlock()
- }
- } else {
- LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- LOGGER.error(s"Not able to acquire the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- sys.error("Table is already locked for compaction. Please try after some time.")
- }
- }
- }
-
- def handleCompactionForSystemLocking(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- storeLocation: String,
- compactionType: CompactionType,
- carbonTable: CarbonTable,
- compactionModel: CompactionModel): Unit = {
- val lock = CarbonLockFactory
- .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
- LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
- )
- if (lock.lockWithRetries()) {
- LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
- s".${ carbonLoadModel.getTableName }")
- try {
- startCompactionThreads(sqlContext,
- carbonLoadModel,
- storePath,
- storeLocation,
- compactionModel,
- lock
- )
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
- lock.unlock()
- // if the compaction is a blocking call then only need to throw the exception.
- if (compactionModel.isDDLTrigger) {
- throw e
- }
- }
- } else {
- LOGGER.audit("Not able to acquire the system level compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- CarbonCompactionUtil
- .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
- // do sys error only in case of DDL trigger.
- if (compactionModel.isDDLTrigger) {
- sys.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
- } else {
- LOGGER.error("Compaction is in progress, compaction request for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
- " is in queue.")
- }
- }
- }
-
- def startCompactionThreads(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- storeLocation: String,
- compactionModel: CompactionModel,
- compactionLock: ICarbonLock): Unit = {
- val executor: ExecutorService = Executors.newFixedThreadPool(1)
- // update the updated table status.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- val compactionThread = new Thread {
- override def run(): Unit = {
-
- try {
- // compaction status of the table which is triggered by the user.
- var triggeredCompactionStatus = false
- var exception: Exception = null
- try {
- DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
- compactionModel: CompactionModel,
- executor, sqlContext, storeLocation
- )
- triggeredCompactionStatus = true
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
- exception = e
- }
- // continue in case of exception also, check for all the tables.
- val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
-
- if (!isConcurrentCompactionAllowed) {
- LOGGER.info("System level compaction lock is enabled.")
- val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
- var table: CarbonTable = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.
- tablesMeta.map(_.carbonTable).toArray,
- skipCompactionTables.toList.asJava)
- while (null != table) {
- LOGGER.info("Compaction request has been identified for table " +
- s"${ table.getDatabaseName }." +
- s"${ table.getFactTableName }")
- val metadataPath = table.getMetaDataFilepath
- val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
- val newCarbonLoadModel = new CarbonLoadModel()
- DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel)
-
- val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
- val newcompactionModel = CompactionModel(compactionSize,
- compactionType,
- table,
- compactionModel.isDDLTrigger
- )
- // proceed for compaction
- try {
- DataManagementFunc.executeCompaction(newCarbonLoadModel,
- newcompactionModel,
- executor, sqlContext, storeLocation
- )
- } catch {
- case e: Exception =>
- LOGGER.error("Exception in compaction thread for table " +
- s"${ table.getDatabaseName }." +
- s"${ table.getFactTableName }")
- // not handling the exception. only logging as this is not the table triggered
- // by user.
- } finally {
- // delete the compaction required file in case of failure or success also.
- if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath, compactionType)) {
- // if the compaction request file is not been able to delete then
- // add those tables details to the skip list so that it wont be considered next.
- skipCompactionTables.+=:(table.getCarbonTableIdentifier)
- LOGGER.error("Compaction request file can not be deleted for table " +
- s"${ table.getDatabaseName }." +
- s"${ table.getFactTableName }")
- }
- }
- // ********* check again for all the tables.
- table = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
- .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava
- )
- }
- // giving the user his error for telling in the beeline if his triggered table
- // compaction is failed.
- if (!triggeredCompactionStatus) {
- throw new Exception("Exception in compaction " + exception.getMessage)
- }
- }
- } finally {
- executor.shutdownNow()
- DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
- compactionLock.unlock()
- }
- }
- }
- // calling the run method of a thread to make the call as blocking call.
- // in the future we may make this as concurrent.
- compactionThread.run()
- }
-
- def loadCarbonData(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- columnar: Boolean,
- partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
- result: Option[DictionaryServer],
- overwriteTable: Boolean,
- dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None): Unit = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val isAgg = false
- // for handling of the segment Merging.
- def handleSegmentMerging(): Unit = {
- LOGGER.info(s"compaction need status is" +
- s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
- if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
- LOGGER.audit(s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- val compactionSize = 0
- val isCompactionTriggerByDDl = false
- val compactionModel = CompactionModel(compactionSize,
- CompactionType.MINOR_COMPACTION,
- carbonTable,
- isCompactionTriggerByDDl
- )
- var storeLocation = ""
- val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != configuredStore && configuredStore.nonEmpty) {
- storeLocation = configuredStore(Random.nextInt(configuredStore.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
- val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- )
- .equalsIgnoreCase("true")
-
- if (!isConcurrentCompactionAllowed) {
-
- handleCompactionForSystemLocking(sqlContext,
- carbonLoadModel,
- storePath,
- storeLocation,
- CompactionType.MINOR_COMPACTION,
- carbonTable,
- compactionModel
- )
- } else {
- val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
-
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the compaction lock.")
- try {
- startCompactionThreads(sqlContext,
- carbonLoadModel,
- storePath,
- storeLocation,
- compactionModel,
- lock
- )
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
- lock.unlock()
- throw e
- }
- } else {
- LOGGER.audit("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
- LOGGER.error("Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
- }
- }
- }
- }
-
- def updateStatus(loadStatus: String,
- stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = {
- val metadataDetails = if (stat != null && stat(0) != null) {
- stat(0)._2._1
- } else {
- new LoadMetadataDetails
- }
- CarbonLoaderUtil
- .populateNewLoadMetaEntry(metadataDetails,
- loadStatus,
- carbonLoadModel.getFactTimeStamp,
- true)
- val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
- carbonLoadModel, false, overwriteTable)
- if (!status) {
- val errorMessage = "Dataload failed due to failure in table status updation."
- LOGGER.audit("Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${
- carbonLoadModel
- .getTableName
- }")
- LOGGER.error("Dataload failed due to failure in table status updation.")
- throw new Exception(errorMessage)
- }
- }
-
- try {
- LOGGER.audit(s"Data load request has been received for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- // Check if any load need to be deleted before loading new data
- DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, storePath, false, carbonTable)
- // get partition way from configuration
- // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
- // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
- // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
- val isTableSplitPartition = false
- var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
- var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
- var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
-
- def loadDataFile(): Unit = {
- if (isTableSplitPartition) {
- /*
- * when data handle by table split partition
- * 1) get partition files, direct load or not will get the different files path
- * 2) get files blocks by using SplitUtils
- * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
- */
- var splits = Array[TableSplit]()
- if (carbonLoadModel.isDirectLoad) {
- // get all table Splits, this part means files were divide to different partitions
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- // get all partition blocks from file list
- blocksGroupBy = splits.map {
- split =>
- val pathBuilder = new StringBuilder()
- for (path <- split.getPartition.getFilesPath.asScala) {
- pathBuilder.append(path).append(",")
- }
- if (pathBuilder.nonEmpty) {
- pathBuilder.substring(0, pathBuilder.size - 1)
- }
- (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
- sqlContext.sparkContext
- ))
- }
- } else {
- // get all table Splits,when come to this, means data have been partition
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null)
- // get all partition blocks from factFilePath/uniqueID/
- blocksGroupBy = splits.map {
- split =>
- val pathBuilder = new StringBuilder()
- pathBuilder.append(carbonLoadModel.getFactFilePath)
- if (!carbonLoadModel.getFactFilePath.endsWith("/")
- && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
- pathBuilder.append("/")
- }
- pathBuilder.append(split.getPartition.getUniqueID).append("/")
- (split.getPartition.getUniqueID,
- SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
- }
- }
- } else {
- /*
- * when data load handle by node partition
- * 1)clone the hadoop configuration,and set the file path to the configuration
- * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
- * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
- * for locally writing carbondata files(one file one block) in nodes
- * use NewCarbonDataLoadRDD to load data and write to carbondata files
- */
- val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
- // FileUtils will skip file which is no csv, and return all file path which split by ','
- val filePaths = carbonLoadModel.getFactFilePath
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
- hadoopConfiguration.set("io.compression.codecs",
- """org.apache.hadoop.io.compress.GzipCodec,
- org.apache.hadoop.io.compress.DefaultCodec,
- org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
- CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
-
- val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
- val jobContext = new Job(hadoopConfiguration)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val blockList = rawSplits.map { inputSplit =>
- val fileSplit = inputSplit.asInstanceOf[FileSplit]
- new TableBlockInfo(fileSplit.getPath.toString,
- fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
- ).asInstanceOf[Distributable]
- }
- // group blocks to nodes, tasks
- val startTime = System.currentTimeMillis
- val activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
- val nodeBlockMapping =
- CarbonLoaderUtil
- .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
- .toSeq
- val timeElapsed: Long = System.currentTimeMillis - startTime
- LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
- LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
- s"No.of Nodes: ${nodeBlockMapping.size}")
- var str = ""
- nodeBlockMapping.foreach(entry => {
- val tableBlock = entry._2
- str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
- tableBlock.asScala.foreach(tableBlockInfo =>
- if (!tableBlockInfo.getLocations.exists(hostentry =>
- hostentry.equalsIgnoreCase(entry._1)
- )) {
- str = str + " , mismatch locations: " + tableBlockInfo.getLocations
- .foldLeft("")((a, b) => a + "," + b)
- }
- )
- str = str + "\n"
- }
- )
- LOGGER.info(str)
- blocksGroupBy = nodeBlockMapping.map(entry => {
- val blockDetailsList =
- entry._2.asScala.map(distributable => {
- val tableBlock = distributable.asInstanceOf[TableBlockInfo]
- new BlockDetails(new Path(tableBlock.getFilePath),
- tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
- )
- }).toArray
- (entry._1, blockDetailsList)
- }
- ).toArray
- }
-
- status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- blocksGroupBy,
- isTableSplitPartition).collect()
- }
-
- def loadDataFrame(): Unit = {
- try {
- val rdd = dataFrame.get.rdd
- val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
- DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
- }.distinct.size
- val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
- sqlContext.sparkContext)
- val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
- var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
- numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
- val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
-
- status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- newRdd).collect()
- } catch {
- case ex: Exception =>
- LOGGER.error(ex, "load data frame failed")
- throw ex
- }
- }
-
- def loadDataFrameForUpdate(): Unit = {
- val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
-
- def triggerDataLoadForSegment(key: String, taskNo: Int,
- iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
- val rddResult = new updateResultImpl()
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
- var partitionID = "0"
- val loadMetadataDetails = new LoadMetadataDetails
- val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
- var uniqueLoadStatusId = ""
- try {
- val segId = key
- val index = taskNo
- uniqueLoadStatusId = carbonLoadModel.getTableName +
- CarbonCommonConstants.UNDERSCORE +
- (index + "_0")
-
- // convert timestamp
- val timeStampInLong = updateModel.get.updatedTimeStamp + ""
- loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadName(segId)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
- carbonLoadModel.setPartitionId(partitionID)
- carbonLoadModel.setSegmentId(segId)
- carbonLoadModel.setTaskNo(String.valueOf(index))
- carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
- // During Block Spill case Increment of File Count and proper adjustment of Block
- // naming is only done when AbstractFactDataWriter.java : initializeWriter get
- // CarbondataFileName as null. For handling Block Spill not setting the
- // CarbondataFileName in case of Update.
- // carbonLoadModel.setCarbondataFileName(newBlockName)
-
- // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- UpdateDataLoad.DataLoadForUpdate(segId,
- index,
- iter,
- carbonLoadModel,
- loadMetadataDetails)
- } catch {
- case e: NoRetryException =>
- loadMetadataDetails
- .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- executionErrors.failureCauses = FailureCauses.BAD_RECORDS
- executionErrors.errorMsg = e.getMessage
- LOGGER.info("Bad Record Found")
- case e: Exception =>
- LOGGER.info("DataLoad failure")
- LOGGER.error(e)
- throw e
- }
-
- var finished = false
-
- override def hasNext: Boolean = !finished
-
- override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
- finished = true
- rddResult
- .getKey(uniqueLoadStatusId,
- (loadMetadataDetails, executionErrors))
- }
- }
- resultIter
- }
-
- val updateRdd = dataFrame.get.rdd
-
- // return directly if no rows to update
- val noRowsToUpdate = updateRdd.isEmpty()
- if (noRowsToUpdate) {
- res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
- return
- }
-
- // splitting as (key, value) i.e., (segment, updatedRows)
- val keyRDD = updateRdd.map(row =>
- (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
-
- val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
- carbonTable.getMetaDataFilepath)
- val segmentIds = loadMetadataDetails.map(_.getLoadName)
- val segmentIdIndex = segmentIds.zipWithIndex.toMap
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val segmentId2maxTaskNo = segmentIds.map { segId =>
- (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
- }.toMap
-
- class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
- extends org.apache.spark.Partitioner {
- override def numPartitions: Int = segmentIdIndex.size * parallelism
-
- override def getPartition(key: Any): Int = {
- val segId = key.asInstanceOf[String]
- // partitionId
- segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
- }
- }
-
- val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex,
- segmentUpdateParallelism))
-
- // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
- // so segmentIdIndex=partitionId/parallelism, this has been verified.
- res = partitionByRdd.map(_._2).mapPartitions { partition =>
- val partitionId = TaskContext.getPartitionId()
- val segIdIndex = partitionId / segmentUpdateParallelism
- val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
- val segId = segmentIds(segIdIndex)
- val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
-
- List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator
- }.collect()
- }
-
- def loadDataForPartitionTable(): Unit = {
- try {
- val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
- status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- rdd).collect()
- } catch {
- case ex: Exception =>
- LOGGER.error(ex, "load data failed for partition table")
- throw ex
- }
- }
-
- // create new segment folder in carbon store
- if (!updateModel.isDefined) {
- CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
- carbonLoadModel.getSegmentId, carbonTable)
- }
- var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- var errorMessage: String = "DataLoad failure"
- var executorMessage: String = ""
- val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel)
- val sortScope = CarbonDataProcessorUtil.getSortScope(configuration)
- try {
- if (updateModel.isDefined) {
- loadDataFrameForUpdate()
- } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
- loadDataForPartitionTable()
- } else if (configuration.isSortTable &&
- sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
- LOGGER.audit("Using global sort for loading.")
- status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
- dataFrame, carbonLoadModel)
- } else if (dataFrame.isDefined) {
- loadDataFrame()
- } else {
- loadDataFile()
- }
- if (updateModel.isDefined) {
-
- res.foreach(resultOfSeg => resultOfSeg.foreach(
- resultOfBlock => {
- if (resultOfBlock._2._1.getLoadStatus
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
- updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
- }
- else {
- updateModel.get.executorErrors = resultOfBlock._2._2
- }
- } else if (resultOfBlock._2._1.getLoadStatus
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
- updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
- }
- }
- ))
-
- }
- else {
- val newStatusMap = scala.collection.mutable.Map.empty[String, String]
- if (status.nonEmpty) {
- status.foreach { eachLoadStatus =>
- val state = newStatusMap.get(eachLoadStatus._1)
- state match {
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- if eachLoadStatus._2._1.getLoadStatus ==
- CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
- case _ =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
- }
- }
-
- newStatusMap.foreach {
- case (key, value) =>
- if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- }
- }
- } else {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- }
-
- if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
- partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
- loadStatus = partitionStatus
- }
- }
- } catch {
- case ex: Throwable =>
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- ex match {
- case sparkException: SparkException =>
- if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
- sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
- executorMessage = sparkException.getCause.getMessage
- errorMessage = errorMessage + ": " + executorMessage
- }
- case _ =>
- executorMessage = ex.getCause.getMessage
- errorMessage = errorMessage + ": " + executorMessage
- }
- LOGGER.info(errorMessage)
- LOGGER.error(ex)
- }
- // handle the status file updation for the update cmd.
- if (updateModel.isDefined) {
-
- if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- // updateModel.get.executorErrors.errorMsg = errorMessage
- if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
- updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- if (null != executorMessage && !executorMessage.isEmpty) {
- updateModel.get.executorErrors.errorMsg = executorMessage
- } else {
- updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
- }
- }
- return
- } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
- carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
- return
- } else {
- // in success case handle updation of the table status file.
- // success case.
- val segmentDetails = new util.HashSet[String]()
-
- var resultSize = 0
-
- res.foreach(resultOfSeg => {
- resultSize = resultSize + resultOfSeg.size
- resultOfSeg.foreach(
- resultOfBlock => {
- segmentDetails.add(resultOfBlock._2._1.getLoadName)
- }
- )}
- )
-
- // this means that the update doesnt have any records to update so no need to do table
- // status file updation.
- if (resultSize == 0) {
- LOGGER.audit("Data update is successful with 0 rows updation for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
- return
- }
-
- if (
- CarbonUpdateUtil
- .updateTableMetadataStatus(segmentDetails,
- carbonTable,
- updateModel.get.updatedTimeStamp + "",
- true,
- new util.ArrayList[String](0))) {
- LOGGER.audit("Data update is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- }
- else {
- val errorMessage = "Data update failed due to failure in table status updation."
- LOGGER.audit("Data update is failed for " +
- s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
- LOGGER.error("Data update failed due to failure in table status updation.")
- updateModel.get.executorErrors.errorMsg = errorMessage
- updateModel.get.executorErrors.failureCauses = FailureCauses
- .STATUS_FILE_UPDATION_FAILURE
- return
- }
-
- }
-
- return
- }
- if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- LOGGER.info("********starting clean up**********")
- CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
- LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- LOGGER.warn("Cannot write load metadata file as data load failed")
- updateStatus(loadStatus, status)
- throw new Exception(errorMessage)
- } else {
- // check if data load fails due to bad record and throw data load failure due to
- // bad record exception
- if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
- carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
- LOGGER.info("********starting clean up**********")
- CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
- LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Data load is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status)
- throw new Exception(status(0)._2._2.errorMsg)
- }
- if (!isAgg) {
- writeDictionary(carbonLoadModel, result)
- updateStatus(loadStatus, status)
- } else if (!carbonLoadModel.isRetentionRequest) {
- // TODO : Handle it
- LOGGER.info("********Database updated**********")
- }
-
- if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
- LOGGER.audit("Data load is partially successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- } else {
- LOGGER.audit("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- }
- try {
- // compaction handling
- handleSegmentMerging()
- } catch {
- case e: Exception =>
- throw new Exception(
- "Dataload is success. Auto-Compaction has failed. Please check logs.")
- }
- }
- }
-
- }
-
- /**
- * repartition the input data for partiton table.
- * @param sqlContext
- * @param dataFrame
- * @param carbonLoadModel
- * @return
- */
- private def repartitionInputData(sqlContext: SQLContext,
- dataFrame: Option[DataFrame],
- carbonLoadModel: CarbonLoadModel): RDD[Row] = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
- val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
- val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
- val columns = carbonLoadModel.getCsvHeaderColumns
- var partitionColumnIndex = -1
- breakable {
- for (i <- 0 until columns.length) {
- if (partitionColumn.equalsIgnoreCase(columns(i))) {
- partitionColumnIndex = i
- break
- }
- }
- }
- if (partitionColumnIndex == -1) {
- throw new DataLoadingException("Partition column not found.")
- }
-
- val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat())
- val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase))
- val timeStampFormat = if (specificFormat.isDefined) {
- new SimpleDateFormat(specificFormat.get)
- } else {
- val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
- .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- new SimpleDateFormat(timestampFormatString)
- }
-
- val dateFormat = if (specificFormat.isDefined) {
- new SimpleDateFormat(specificFormat.get)
- } else {
- val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
- .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
- new SimpleDateFormat(dateFormatString)
- }
-
- // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
- val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
- // input data from DataFrame
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val serializationNullFormat =
- carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- dataFrame.get.rdd.map { row =>
- if (null != row && row.length > partitionColumnIndex &&
- null != row.get(partitionColumnIndex)) {
- (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
- } else {
- (null, row)
- }
- }
- } else {
- // input data from csv files
- val hadoopConfiguration = new Configuration()
- CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
- val columnCount = columns.length
- new NewHadoopRDD[NullWritable, StringArrayWritable](
- sqlContext.sparkContext,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- hadoopConfiguration
- ).map { currentRow =>
- if (null == currentRow || null == currentRow._2) {
- val row = new StringArrayRow(new Array[String](columnCount))
- (null, row)
- } else {
- val row = new StringArrayRow(new Array[String](columnCount))
- val values = currentRow._2.get()
- if (values != null && values.length > partitionColumnIndex) {
- (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
- } else {
- (null, row.setValues(currentRow._2.get()))
- }
- }
- }
- }
-
- val partitioner = PartitionFactory.getPartitioner(partitionInfo)
- if (partitionColumnDataType == DataTypes.STRING) {
- if (partitionInfo.getPartitionType == PartitionType.RANGE) {
- inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
- .partitionBy(partitioner)
- .map(_._2)
- } else {
- inputRDD.partitionBy(partitioner)
- .map(_._2)
- }
- } else {
- inputRDD.map { row =>
- (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat,
- dateFormat), row._2)
- }
- .partitionBy(partitioner)
- .map(_._2)
- }
- }
-
- private def writeDictionary(carbonLoadModel: CarbonLoadModel,
- result: Option[DictionaryServer]) = {
- // write dictionary file and shutdown dictionary server
- val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
- carbonLoadModel.getTableName }"
- result match {
- case Some(server) =>
- try {
- server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- .getCarbonTableIdentifier.getTableId)
- } catch {
- case ex: Exception =>
- LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
- throw new Exception("Dataload failed due to error while writing dictionary file!")
- }
- case _ =>
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
deleted file mode 100644
index f8275d1..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.thriftserver
-
-import java.io.File
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.CarbonContext
-import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-object CarbonThriftServer {
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- .setAppName("Carbon Thrift Server")
- if (!conf.contains("carbon.properties.filepath")) {
- val sparkHome = System.getenv.get("SPARK_HOME")
- if (sparkHome != null) {
- val file = new File(sparkHome + '/' + "conf" + '/' + "carbon.properties")
- if (file.exists()) {
- conf.set("carbon.properties.filepath", file.getCanonicalPath)
- System.setProperty("carbon.properties.filepath", file.getCanonicalPath)
- }
- }
- } else {
- System.setProperty("carbon.properties.filepath", conf.get("carbon.properties.filepath"))
- }
- if (org.apache.spark.SPARK_VERSION.startsWith("1.6")) {
- conf.set("spark.sql.hive.thriftServer.singleSession", "true")
- }
- val sc = new SparkContext(conf)
- val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
- try {
- Thread.sleep(Integer.parseInt(warmUpTime))
- } catch {
- case e: Exception =>
- val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
- "Using default Value and proceeding")
- Thread.sleep(30000)
- }
- val storePath = if (args.length > 0) args.head else null
- val cc = new CarbonContext(sc, storePath)
-
- HiveThriftServer2.startWithContext(cc)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
deleted file mode 100644
index 118249a..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-
-case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
-object CarbonSparkUtil {
-
- def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
- val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName) // wf : may be problem
- val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName)
- val dictionary =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
- (f.getColName.toLowerCase,
- f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !f.getDataType.isComplexType)
- }
- CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index 4950227..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
- /**
- * createCarbonInputFormat from query model
- */
- def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonTableInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
- val jobConf: JobConf = new JobConf(new Configuration)
- val job: Job = new Job(jobConf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- (carbonInputFormat, job)
- }
-
- def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonTableInputFormat[V] = {
- val carbonInputFormat = new CarbonTableInputFormat[V]()
- val job: Job = new Job(conf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- carbonInputFormat
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
deleted file mode 100644
index ea75ccb..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import java.lang.Long
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, InputMetricsStats}
-import org.apache.carbondata.spark.InitInputMetrics
-
-/**
- * It gives statistics of number of bytes and record read
- */
-class CarbonInputMetrics extends InitInputMetrics {
- @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- var inputMetrics: InputMetrics = _
- var bytesReadCallback: Option[() => scala.Long] = _
- var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
-
- def initBytesReadCallback(context: TaskContext,
- carbonMultiBlockSplit: CarbonMultiBlockSplit) {
- inputMetrics = context.taskMetrics().getInputMetricsForReadMethod(DataReadMethod.Hadoop)
- this.carbonMultiBlockSplit = carbonMultiBlockSplit;
- bytesReadCallback = carbonMultiBlockSplit match {
- case _: CarbonMultiBlockSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
- }
- }
-
- def incrementRecordRead(recordRead: Long) {
- inputMetrics.incRecordsRead(recordRead)
- }
-
- def updateAndClose() {
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
- } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesRead(carbonMultiBlockSplit.getLength)
- } catch {
- case e: java.io.IOException =>
- LOGGER.warn("Unable to get input size to set InputMetrics for task:" + e.getMessage)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index a6c28a9..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-
-case class CastExpr(expr: Expression) extends Filter
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
- extends LeafExpression with NamedExpression with CodegenFallback {
-
- type EvaluatedType = Any
-
- override def toString: String = s"input[" + colExp.getColIndex + "]"
-
- override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
- override def name: String = colExp.getColumnName
-
- override def toAttribute: Attribute = throw new UnsupportedOperationException
-
- override def exprId: ExprId = throw new UnsupportedOperationException
-
- override def qualifiers: Seq[String] = throw new UnsupportedOperationException
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
deleted file mode 100644
index 024c54b..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{ UnaryNode, _ }
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Top command
- */
-case class Top(count: Int, topOrBottom: Int, dim: NamedExpression, msr: NamedExpression,
- child: LogicalPlan) extends UnaryNode {
- def output: Seq[Attribute] = child.output
-
- override def references: AttributeSet = {
- val list = List(dim, msr)
- AttributeSet(list.flatMap(_.references))
- }
-}
-
-object getDB {
-
- def getDatabaseName(dbName: Option[String], sqlContext: SQLContext): String = {
- dbName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
- }
-
-}
-
-/**
- * Shows Loads in a table
- */
-case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
- extends LogicalPlan with Command {
-
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
- AttributeReference("Status", StringType, nullable = false)(),
- AttributeReference("Load Start Time", TimestampType, nullable = false)(),
- AttributeReference("Load End Time", TimestampType, nullable = false)())
- }
-}
-
-/**
- * Describe formatted for hive table
- */
-case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
- extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[AttributeReference] =
- Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
-case class CarbonDictionaryCatalystDecoder(
- relations: Seq[CarbonDecoderRelation],
- profile: CarbonProfile,
- aliasMap: CarbonAliasDecoderRelation,
- isOuter: Boolean,
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
- def isEmpty: Boolean = attributes.isEmpty
-}
-
-case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class CreateDatabase(dbName: String, sql: String) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = {
- Seq()
- }
-}
-
-case class DropDatabase(dbName: String, isCascade: Boolean, sql: String)
- extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = {
- Seq()
- }
-}
-
-case class UseDatabase(sql: String) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = {
- Seq()
- }
-}
-
-case class ProjectForUpdate(
- table: UnresolvedRelation,
- columns: List[String],
- children: Seq[LogicalPlan] ) extends LogicalPlan with Command {
- override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class UpdateTable(
- table: UnresolvedRelation,
- columns: List[String],
- selectStmt: String,
- filer: String) extends LogicalPlan {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class DeleteRecords(
- statement: String,
- table: UnresolvedRelation) extends LogicalPlan {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class ShowPartitions(
- table: TableIdentifier) extends LogicalPlan {
- override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[Attribute] = CommonUtil.partitionInfoOutput
-}
-
-/**
- * A logical plan representing insertion into Hive table.
- * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
- * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
- */
-case class InsertIntoCarbonTable(
- table: CarbonDatasourceRelation,
- partition: Map[String, Option[String]],
- child: LogicalPlan,
- overwrite: Boolean,
- ifNotExists: Boolean)
- extends LogicalPlan with Command {
-
- override def children: Seq[LogicalPlan] = child :: Nil
- override def output: Seq[Attribute] = Seq.empty
-
- // This is the expected schema of the table prepared to be inserted into,
- // including dynamic partition columns.
- val tableOutput = table.carbonRelation.output
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
deleted file mode 100644
index da4b210..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.io.File
-
-import scala.language.implicitConversions
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.ExtractPythonUDFs
-import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, PreWriteCheck}
-import org.apache.spark.sql.hive._
-import org.apache.spark.sql.optimizer.CarbonOptimizer
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-
-class CarbonContext(
- val sc: SparkContext,
- val storePath: String,
- metaStorePath: String) extends HiveContext(sc) {
- self =>
-
- def this(sc: SparkContext) = {
- this(sc,
- null,
- new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
- }
-
- def this(sc: SparkContext, storePath: String) = {
- this(sc,
- storePath,
- new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
- }
-
- CarbonContext.addInstance(sc, this)
- CodeGenerateFactory.init(sc.version)
- udf.register("getTupleId", () => "")
- CarbonEnv.init(this)
-
- var lastSchemaUpdatedTime = System.currentTimeMillis()
- val hiveClientInterface = metadataHive
-
- protected[sql] override lazy val conf: SQLConf = new CarbonSQLConf
-
- @transient
- override lazy val catalog = {
- val carbonProperties = CarbonProperties.getInstance()
- if (storePath != null) {
- carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
- // In case if it is in carbon.properties for backward compatible
- } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
- carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
- conf.getConfString("spark.sql.warehouse.dir"))
- }
- new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
- }
-
- @transient
- override protected[sql] lazy val analyzer =
- new Analyzer(catalog, functionRegistry, conf) {
-
- override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.CreateTables ::
- CarbonIUDAnalysisRule ::
- CarbonPreInsertionCasts ::
- ExtractPythonUDFs ::
- ResolveHiveWindowFunction ::
- PreInsertCastAndRename ::
- Nil
-
- override val extendedCheckRules = Seq(
- PreWriteCheck(catalog)
- )
- }
-
- @transient
- override protected[sql] lazy val optimizer: Optimizer =
- CarbonOptimizer.optimizer(
- CodeGenerateFactory.createDefaultOptimizer(conf, sc),
- conf.asInstanceOf[CarbonSQLConf],
- sc.version)
-
- protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
-
- experimental.extraStrategies = {
- val carbonStrategy = new CarbonStrategies(self)
- Seq(carbonStrategy.CarbonTableScan, carbonStrategy.DDLStrategies)
- }
-
- override protected def configure(): Map[String, String] = {
- sc.hadoopConfiguration.addResource("hive-site.xml")
- if (sc.hadoopConfiguration.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) {
- val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
- logDebug(s"metastore db is going to be created in location: $hiveMetaStoreDB")
- super.configure() ++ Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL,
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
- ("hive.metastore.warehouse.dir", metaStorePathAbsolute + "/hivemetadata"))
- } else {
- super.configure()
- }
- }
-
- @transient
- private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-
- var queryId: String = ""
-
- override def sql(sql: String): DataFrame = {
- // queryId will be unique for each query, creting query detail holder
- queryId = System.nanoTime() + ""
- this.setConf("queryId", queryId)
-
- CarbonContext.updateCarbonPorpertiesPath(this)
- val sqlString = sql.toUpperCase
- LOGGER.info(s"Query [$sqlString]")
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- val logicPlan: LogicalPlan = parseSql(sql)
- statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- val result = new DataFrame(this, logicPlan)
-
- // We force query optimization to happen right away instead of letting it happen lazily like
- // when using the query DSL. This is so DDL commands behave as expected. This is only
- // generates the RDD lineage for DML queries, but do not perform any execution.
- result
- }
-
-}
-
-object CarbonContext {
-
- val datasourceName: String = "org.apache.carbondata.format"
-
- val datasourceShortName: String = "carbondata"
-
- @transient
- private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-
- final def updateCarbonPorpertiesPath(hiveContext: HiveContext) {
- val carbonPropertiesFilePath = hiveContext.getConf("carbon.properties.filepath", null)
- val systemcarbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- if (null != carbonPropertiesFilePath && null == systemcarbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- carbonPropertiesFilePath + "/" + "carbon.properties")
- }
- // configuring the zookeeper URl .
- val zooKeeperUrl = hiveContext.getConf("spark.deploy.zookeeper.url", "127.0.0.1:2181")
-
- CarbonProperties.getInstance().addProperty("spark.deploy.zookeeper.url", zooKeeperUrl)
-
- }
-
- // this cache is used to avoid creating multiple CarbonContext from same SparkContext,
- // to avoid the derby problem for metastore
- private val cache = collection.mutable.Map[SparkContext, CarbonContext]()
-
- def getInstance(sc: SparkContext): CarbonContext = {
- cache(sc)
- }
-
- def addInstance(sc: SparkContext, cc: CarbonContext): Unit = {
- if (cache.contains(sc)) {
- sys.error("creating multiple instances of CarbonContext is not " +
- "allowed using the same SparkContext instance")
- }
- cache(sc) = cc
- }
-}
-
-object SQLParser {
- def parse(sql: String, sqlContext: SQLContext): LogicalPlan = sqlContext.parseSql(sql)
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
deleted file mode 100644
index 2c2e954..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, JobID}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark._
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
-import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.rdd.CarbonRDD
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
-
-private[sql] case class CarbonDatasourceHadoopRelation(
- sqlContext: SQLContext,
- paths: Array[String],
- parameters: Map[String, String],
- tableSchema: Option[StructType])
- extends HadoopFsRelation {
-
- lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head))
- if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) {
- throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head)
- }
-
- lazy val job = new Job(new JobConf())
- lazy val options = new CarbonOption(parameters)
- lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
- lazy val relationRaw: CarbonRelation = {
- val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
- if (carbonTable == null) {
- sys.error(s"CarbonData file path ${paths.head} is not valid")
- }
- CarbonRelation(
- carbonTable.getDatabaseName,
- carbonTable.getFactTableName,
- CarbonSparkUtil.createSparkMeta(carbonTable),
- new TableMeta(carbonTable.getCarbonTableIdentifier,
- paths.head, absIdentifier.getTablePath, carbonTable),
- None
- )(sqlContext)
- }
-
- override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema)
-
- override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- // TODO
- throw new UnsupportedOperationException
- }
-
- override def buildScan(
- requiredColumns: Array[String],
- filters: Array[Filter],
- inputFiles: Array[FileStatus]): RDD[Row] = {
- val conf = new Configuration(job.getConfiguration)
- filters.flatMap { filter =>
- CarbonFilters.createCarbonFilter(dataSchema, filter)
- }.reduceOption(new AndExpression(_, _))
- .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
-
- val projection = new CarbonProjection
- requiredColumns.foreach(projection.addColumn)
- CarbonTableInputFormat.setColumnProjection(conf, projection)
- CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
-
- new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
- new SerializableConfiguration(conf),
- absIdentifier,
- classOf[CarbonTableInputFormat[Row]],
- classOf[Row]
- )
- }
-
-}
-
-class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
- val carbonSplit: SerializableWritable[CarbonInputSplit])
- extends Partition {
-
- override val index: Int = idx
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-class CarbonHadoopFSRDD[V: ClassTag](
- @transient sc: SparkContext,
- conf: SerializableConfiguration,
- identifier: AbsoluteTableIdentifier,
- inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
- valueClass: Class[V])
- extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
-
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
- @transient protected val jobId = new JobID(jobTrackerId, id)
-
- override def internalCompute(split: Partition,
- context: TaskContext): Iterator[V] = {
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
- val job: Job = new Job(hadoopAttemptContext.getConfiguration)
- val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
- CarbonInputFormat.setDataTypeConverter(hadoopAttemptContext.getConfiguration,
- new SparkDataTypeConverterImpl)
- hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
- val reader =
- format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
- hadoopAttemptContext
- )
- reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
- hadoopAttemptContext
- )
- new Iterator[V] {
- private[this] var havePair = false
- private[this] var finished = false
-
- override def hasNext: Boolean = {
- if (context.isInterrupted) {
- throw new TaskKilledException
- }
- if (!finished && !havePair) {
- finished = !reader.nextKeyValue
- if (finished) {
- reader.close()
- }
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- reader.getCurrentValue
- }
- }
- }
-
- override protected def getPartitions: Array[Partition] = {
- val jobContext = newJobContext(conf.value, jobId)
- val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
- jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
- val splits = format.getSplits(jobContext).toArray
- val carbonInputSplits = splits
- .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
- carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
- }
-}