You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/25 10:21:02 UTC
[2/4] incubator-carbondata git commit: change ScanRdd to use
RecordReader
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7798e5c..e8d7399 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -17,49 +17,43 @@
package org.apache.carbondata.spark.rdd
+import java.text.SimpleDateFormat
import java.util
+import java.util.Date
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.scan.executor.QueryExecutor
-import org.apache.carbondata.scan.executor.QueryExecutorFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.scan.model.QueryModel
-import org.apache.carbondata.scan.result.BatchResult
-import org.apache.carbondata.scan.result.iterator.ChunkRowIterator
-import org.apache.carbondata.spark.RawValue
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
-
-class CarbonSparkPartition(rddId: Int, val idx: Int,
- val locations: Array[String],
- val tableBlockInfos: util.List[TableBlockInfo])
+class CarbonSparkPartition(
+ val rddId: Int,
+ val idx: Int,
+ @transient val multiBlockSplit: CarbonMultiBlockSplit)
extends Partition {
+ val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
override val index: Int = idx
- // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
- override def hashCode(): Int = {
- 41 * (41 + rddId) + idx
- }
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
}
/**
@@ -68,169 +62,135 @@ class CarbonSparkPartition(rddId: Int, val idx: Int,
* level filtering in driver side.
*/
class CarbonScanRDD[V: ClassTag](
- sc: SparkContext,
- queryModel: QueryModel,
+ @transient sc: SparkContext,
+ columnProjection: Seq[Attribute],
filterExpression: Expression,
- keyClass: RawValue[V],
- @transient conf: Configuration,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- baseStoreLocation: String)
- extends RDD[V](sc, Nil) {
+ identifier: AbsoluteTableIdentifier,
+ @transient carbonTable: CarbonTable)
+ extends RDD[V](sc, Nil)
+ with CarbonHadoopMapReduceUtil
+ with Logging {
+
+ private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+ @transient private val jobId = new JobID(jobTrackerId, id)
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def getPartitions: Array[Partition] = {
- var defaultParallelism = sparkContext.defaultParallelism
- val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
- QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+ val job = Job.getInstance(new Configuration())
+ val format = prepareInputFormatForDriver(job.getConfiguration)
// initialise query_id for job
- job.getConfiguration.set("query.id", queryModel.getQueryId)
-
- val result = new util.ArrayList[Partition](defaultParallelism)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
- .getValidAndInvalidSegments
- // set filter resolver tree
- try {
- // before applying filter check whether segments are available in the table.
- if (!validAndInvalidSegments.getValidSegments.isEmpty) {
- val filterResolver = carbonInputFormat
- .getResolvedFilter(job.getConfiguration, filterExpression)
- CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
- queryModel.setFilterExpressionResolverTree(filterResolver)
- CarbonInputFormat
- .setSegmentsToAccess(job.getConfiguration,
- validAndInvalidSegments.getValidSegments
- )
- SegmentTaskIndexStore.getInstance()
- .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
- queryModel.getAbsoluteTableIdentifier
- )
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
- }
+ job.getConfiguration.set("query.id", queryId)
+
// get splits
- val splits = carbonInputFormat.getSplits(job)
+ val splits = format.getSplits(job)
+ val result = distributeSplits(splits)
+ result
+ }
+
+ private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+ // this function distributes the split based on following logic:
+ // 1. based on data locality, to make split balanced on all available nodes
+ // 2. if the number of split for one
+
+ var statistic = new QueryStatistic()
+ val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val parallelism = sparkContext.defaultParallelism
+ val result = new util.ArrayList[Partition](parallelism)
+ var noOfBlocks = 0
+ var noOfNodes = 0
+ var noOfTasks = 0
+
if (!splits.isEmpty) {
- val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
- queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
- val blockListTemp = carbonInputSplits.map(inputSplit =>
- new TableBlockInfo(inputSplit.getPath.toString,
- inputSplit.getStart, inputSplit.getSegmentId,
- inputSplit.getLocations, inputSplit.getLength,
- new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
- )
- )
- var activeNodes = Array[String]()
- if (blockListTemp.nonEmpty) {
- activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
- }
- defaultParallelism = sparkContext.defaultParallelism
- val blockList = CarbonLoaderUtil.
- distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
-
- if (blockList.nonEmpty) {
- var statistic = new QueryStatistic()
- // group blocks to nodes, tasks
- val nodeBlockMapping =
- CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
- activeNodes.toList.asJava
- )
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
- statistic = new QueryStatistic()
- var i = 0
- // Create Spark Partition for each task and assign blocks
- nodeBlockMapping.asScala.foreach { entry =>
- entry._2.asScala.foreach { blocksPerTask => {
- val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
- if (blocksPerTask.size() != 0) {
- result
- .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
- i += 1
- }
+ // create a list of block based on split
+ val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+ // get the list of executors and map blocks to executors based on locality
+ val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+ // divide the blocks among the tasks of the nodes as per the data locality
+ val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+ parallelism, activeNodes.toList.asJava)
+
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statistic = new QueryStatistic()
+
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+ blockList.asScala.foreach { blocksPerTask =>
+ val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ if (blocksPerTask.size() != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+ val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+ result.add(partition)
+ i += 1
}
- }
- }
- val noOfBlocks = blockList.size
- val noOfNodes = nodeBlockMapping.size
- val noOfTasks = result.size()
- logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
- + s"parallelism: $defaultParallelism , " +
- s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
- )
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
- System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
- statisticRecorder.logStatisticsAsTableDriver()
- result.asScala.foreach { r =>
- val cp = r.asInstanceOf[CarbonSparkPartition]
- logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
- s", No.Of Blocks: ${ cp.tableBlockInfos.size() }"
- )
}
- } else {
- logInfo("No blocks identified to scan")
}
- } else {
- logInfo("No valid segments found to scan")
+
+ noOfBlocks = splits.size
+ noOfNodes = nodeBlockMapping.size
+ noOfTasks = result.size()
+
+ statistic = new QueryStatistic()
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+ System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statisticRecorder.logStatisticsAsTableDriver()
}
+ logInfo(
+ s"""
+ | Identified no.of.blocks: $noOfBlocks,
+ | no.of.tasks: $noOfTasks,
+ | no.of.nodes: $noOfNodes,
+ | parallelism: $parallelism
+ """.stripMargin)
result.toArray(new Array[Partition](result.size()))
}
- override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[V] {
- var rowIterator: CarbonIterator[Array[Any]] = _
- var queryStartTime: Long = 0
- val queryExecutor = QueryExecutorFactory.getQueryExecutor()
- try {
- context.addTaskCompletionListener(context => {
- clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- logStatistics()
- queryExecutor.finish
- })
- val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
- if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
- queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
- // fill table block info
- queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
- queryStartTime = System.currentTimeMillis
- val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- logInfo("*************************" + carbonPropertiesFilePath)
- if (null == carbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
- }
- // execute query
- rowIterator = new ChunkRowIterator(
- queryExecutor.execute(queryModel).
- asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+ override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+ )
+ }
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- if (null != e.getMessage) {
- sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
- } else {
- sys.error("Exception occurred in query execution.Please check logs.")
- }
- }
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
+ val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+ val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+ val reader = format.createRecordReader(inputSplit, attemptContext)
+ reader.initialize(inputSplit, attemptContext)
+
+ val queryStartTime = System.currentTimeMillis
- var havePair = false
- var finished = false
- var recordCount = 0
+ new Iterator[V] {
+ private var havePair = false
+ private var finished = false
+ private var count = 0
+
+ context.addTaskCompletionListener { context =>
+ logStatistics(queryStartTime, count)
+ reader.close()
+ }
override def hasNext: Boolean = {
+ if (context.isInterrupted) {
+ throw new TaskKilledException
+ }
if (!finished && !havePair) {
- finished = (null == rowIterator) || (!rowIterator.hasNext)
+ finished = !reader.nextKeyValue
+ if (finished) {
+ reader.close()
+ }
havePair = !finished
}
!finished
@@ -241,68 +201,55 @@ class CarbonScanRDD[V: ClassTag](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- recordCount += 1
- keyClass.getValue(rowIterator.next())
+ val value: V = reader.getCurrentValue
+ count += 1
+ value
}
+ }
+ }
- def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
- if (null != columnToDictionaryMap) {
- org.apache.carbondata.spark.util.CarbonQueryUtil
- .clearColumnDictionaryCache(columnToDictionaryMap)
- }
- }
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonTable(conf, carbonTable)
+ createInputFormat(conf)
+ }
- def logStatistics(): Unit = {
- if (null != queryModel.getStatisticsRecorder) {
- var queryStatistic = new QueryStatistic()
- queryStatistic
- .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
- System.currentTimeMillis - queryStartTime
- )
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // result size
- queryStatistic = new QueryStatistic()
- queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // print executor query statistics for each task_id
- queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
- }
- }
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
+ createInputFormat(conf)
+ }
+
+ private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+ val format = new CarbonInputFormat[V]
+ CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+ CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ val projection = new CarbonProjection
+ columnProjection.foreach { attr =>
+ projection.addColumn(attr.name)
}
+ CarbonInputFormat.setColumnProjection(conf, projection)
+ format
+ }
- iter
+ def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+ var queryStatistic = new QueryStatistic()
+ queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+ System.currentTimeMillis - queryStartTime)
+ val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // result size
+ queryStatistic = new QueryStatistic()
+ queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // print executor query statistics for each task_id
+ statisticRecorder.logStatisticsAsTableExecutor()
}
/**
* Get the preferred locations where to launch this task.
*/
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- val theSplit = partition.asInstanceOf[CarbonSparkPartition]
- val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
- val tableBlocks = theSplit.tableBlockInfos
- // node name and count mapping
- val blockMap = new util.LinkedHashMap[String, Integer]()
-
- tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
- location => {
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
- }
- }
- )
- )
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
- }
- )
-
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonSparkPartition]
+ val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+ firstOptionLocation
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 9c9be8d..5fdbc5d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -73,17 +73,8 @@ object Compactor {
maxSegmentColumnSchemaList = null
)
carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
- val segmentStatusManager = new SegmentStatusManager(new AbsoluteTableIdentifier
- (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId
- )
- )
- )
- carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
- .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
- )
+ carbonLoadModel.setLoadMetadataDetails(
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index c55c807..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
- /**
- * createCarbonInputFormat from query model
- */
- def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
- val jobConf: JobConf = new JobConf(new Configuration)
- val job: Job = new Job(jobConf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- (carbonInputFormat, job)
- }
-
- def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
- val job: Job = new Job(conf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- carbonInputFormat
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index c9d2a0f..ca0ad58 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,14 +38,13 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.scan.expression.logical.AndExpression
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
private[sql] case class CarbonDatasourceHadoopRelation(
@@ -104,7 +103,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- CarbonInputFormat.setColumnProjection(projection, conf)
+ CarbonInputFormat.setColumnProjection(conf, projection)
CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
@@ -145,12 +144,11 @@ class CarbonHadoopFSRDD[V: ClassTag](
context: TaskContext): Iterator[V] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
- val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
- hadoopAttemptContext.getConfiguration
- )
+ val job: Job = new Job(hadoopAttemptContext.getConfiguration)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
val reader =
- inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+ format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
hadoopAttemptContext
)
reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
@@ -186,11 +184,9 @@ class CarbonHadoopFSRDD[V: ClassTag](
override protected def getPartitions: Array[Partition] = {
val jobContext = newJobContext(conf.value, jobId)
- val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
- jobContext.getConfiguration
- )
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
- val splits = carbonInputFormat.getSplits(jobContext).toArray
+ val splits = format.getSplits(jobContext).toArray
val carbonInputSplits = splits
.map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 069e106..a06d5cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -272,9 +272,8 @@ case class CarbonRelation(
private var sizeInBytesLocalValue = 0L
def sizeInBytes: Long = {
- val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
+ val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
tableMeta.carbonTable.getAbsoluteTableIdentifier)
- .getTableStatusLastModifiedTime
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
tableMeta.storePath,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
deleted file mode 100644
index c105cae..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.scan.model._
-import org.apache.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
- var attributesRaw: Seq[Attribute],
- relationRaw: CarbonRelation,
- dimensionPredicatesRaw: Seq[Expression],
- useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
- val carbonTable = relationRaw.metaData.carbonTable
- val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
- val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
- val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
- val unprocessedExprs = new ArrayBuffer[Expression]()
-
- val buildCarbonPlan: CarbonQueryPlan = {
- val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
-
- plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
- plan.setOutLocationPath(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
- plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
- processFilterExpressions(plan)
- plan
- }
-
- def processFilterExpressions(plan: CarbonQueryPlan) {
- if (dimensionPredicatesRaw.nonEmpty) {
- val expressionVal = CarbonFilters.processExpression(
- dimensionPredicatesRaw,
- attributesNeedToDecode,
- unprocessedExprs,
- carbonTable)
- expressionVal match {
- case Some(ce) =>
- // adding dimension used in expression in querystats
- plan.setFilterExpression(ce)
- case _ =>
- }
- }
- processExtraAttributes(plan)
- }
-
- private def processExtraAttributes(plan: CarbonQueryPlan) {
- if (attributesNeedToDecode.size() > 0) {
- val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
- attributesNeedToDecode.asScala.foreach { attr =>
- if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
- attributeOut += attr
- }
- }
- attributesRaw = attributeOut
- }
-
- val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- val dimAttr = new Array[Attribute](dimensions.size())
- val msrAttr = new Array[Attribute](measures.size())
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if(carbonDimension != null) {
- dimAttr(dimensions.indexOf(carbonDimension)) = attr
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if(carbonMeasure != null) {
- msrAttr(measures.indexOf(carbonMeasure)) = attr
- }
- }
- }
-
- attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
- var queryOrder: Integer = 0
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedDims += dim
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if (carbonMeasure != null) {
- val m1 = new QueryMeasure(attr.name)
- m1.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedMsrs += m1
- }
- }
- }
-
- // Fill the selected dimensions & measures obtained from
- // attributes to query plan for detailed query
- selectedDims.foreach(plan.addDimension)
- selectedMsrs.foreach(plan.addMeasure)
- }
-
-
- def inputRdd: CarbonScanRDD[Array[Any]] = {
-
- val conf = new Configuration()
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val model = QueryModel.createModel(
- absoluteTableIdentifier, buildCarbonPlan, carbonTable)
- val kv: RawValue[Array[Any]] = new RawValueImpl
- // setting queryid
- buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
- val tableCreationTime = carbonCatalog
- .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
- val schemaLastUpdatedTime = carbonCatalog
- .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
- val big = new CarbonScanRDD(
- ocRaw.sparkContext,
- model,
- buildCarbonPlan.getFilterExpression,
- kv,
- conf,
- tableCreationTime,
- schemaLastUpdatedTime,
- carbonCatalog.storePath)
- big
- }
-
-
- override def outputsUnsafeRows: Boolean =
- (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
- override def doExecute(): RDD[InternalRow] = {
- val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
- inputRdd.mapPartitions { iter =>
- val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
- new Iterator[InternalRow] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): InternalRow =
- if (outUnsafeRows) {
- unsafeProjection(new GenericMutableRow(iter.next()))
- } else {
- new GenericMutableRow(iter.next())
- }
- }
- }
- }
-
- def output: Seq[Attribute] = {
- attributesRaw
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
new file mode 100644
index 0000000..6580c4f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.LeafNode
+import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.scan.model._
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+ var attributesRaw: Seq[Attribute],
+ relationRaw: CarbonRelation,
+ dimensionPredicatesRaw: Seq[Expression],
+ useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
+ val carbonTable = relationRaw.metaData.carbonTable
+ val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
+ val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
+ @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+
+ val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
+ val unprocessedExprs = new ArrayBuffer[Expression]()
+
+ val buildCarbonPlan: CarbonQueryPlan = {
+ val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
+
+ plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+ plan.setOutLocationPath(
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
+ plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+ processFilterExpressions(plan)
+ plan
+ }
+
+ def processFilterExpressions(plan: CarbonQueryPlan) {
+ if (dimensionPredicatesRaw.nonEmpty) {
+ val expressionVal = CarbonFilters.processExpression(
+ dimensionPredicatesRaw,
+ attributesNeedToDecode,
+ unprocessedExprs,
+ carbonTable)
+ expressionVal match {
+ case Some(ce) =>
+ // adding dimension used in expression in querystats
+ plan.setFilterExpression(ce)
+ case _ =>
+ }
+ }
+ processExtraAttributes(plan)
+ }
+
+ private def processExtraAttributes(plan: CarbonQueryPlan) {
+ if (attributesNeedToDecode.size() > 0) {
+ val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+ attributesNeedToDecode.asScala.foreach { attr =>
+ if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+ attributeOut += attr
+ }
+ }
+ attributesRaw = attributeOut
+ }
+
+ val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+ val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+ val dimAttr = new Array[Attribute](dimensions.size())
+ val msrAttr = new Array[Attribute](measures.size())
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if(carbonDimension != null) {
+ dimAttr(dimensions.indexOf(carbonDimension)) = attr
+ } else {
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if(carbonMeasure != null) {
+ msrAttr(measures.indexOf(carbonMeasure)) = attr
+ }
+ }
+ }
+
+ attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+
+ var queryOrder: Integer = 0
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null) {
+ val dim = new QueryDimension(attr.name)
+ dim.setQueryOrder(queryOrder)
+ queryOrder = queryOrder + 1
+ selectedDims += dim
+ } else {
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if (carbonMeasure != null) {
+ val m1 = new QueryMeasure(attr.name)
+ m1.setQueryOrder(queryOrder)
+ queryOrder = queryOrder + 1
+ selectedMsrs += m1
+ }
+ }
+ }
+
+ // Fill the selected dimensions & measures obtained from
+ // attributes to query plan for detailed query
+ selectedDims.foreach(plan.addDimension)
+ selectedMsrs.foreach(plan.addMeasure)
+ }
+
+ def inputRdd: CarbonScanRDD[Array[Any]] = {
+
+ val conf = new Configuration()
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+ // setting queryid
+ buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+ val tableCreationTime = carbonCatalog
+ .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+ val schemaLastUpdatedTime = carbonCatalog
+ .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+ new CarbonScanRDD(
+ ocRaw.sparkContext,
+ attributesRaw,
+ buildCarbonPlan.getFilterExpression,
+ absoluteTableIdentifier,
+ carbonTable
+ )
+ }
+
+ override def outputsUnsafeRows: Boolean =
+ (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+
+ override def doExecute(): RDD[InternalRow] = {
+ val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+ inputRdd.mapPartitions { iter =>
+ val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): InternalRow = {
+ val value = iter.next
+ if (outUnsafeRows) {
+ unsafeProjection(new GenericMutableRow(value))
+ } else {
+ new GenericMutableRow(value)
+ }
+ }
+ }
+ }
+ }
+
+ def output: Seq[Attribute] = {
+ attributesRaw
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a6b4ec5..74b0dd2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -924,10 +924,9 @@ private[sql] case class DeleteLoadsById(
}
val path = carbonTable.getMetaDataFilepath
- val segmentStatusManager =
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
try {
- val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
+ val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
if (invalidLoadIds.isEmpty) {
@@ -986,8 +985,6 @@ private[sql] case class DeleteLoadsByLoadDate(
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
.getCarbonTable(dbName + '_' + tableName)
- val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
if (null == carbonTable) {
var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
@@ -995,8 +992,9 @@ private[sql] case class DeleteLoadsByLoadDate(
val path = carbonTable.getMetaDataFilepath()
try {
- val invalidLoadTimestamps = segmentStatusManager
- .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+ val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier, loadDate, path,
+ timeObj.asInstanceOf[java.lang.Long]).asScala
if (invalidLoadTimestamps.isEmpty) {
LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
}
@@ -1328,12 +1326,8 @@ private[sql] case class ShowLoads(
if (carbonTable == null) {
sys.error(s"$databaseName.$tableName is not found")
}
- val path = carbonTable.getMetaDataFilepath()
-
- val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
- val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(path)
-
+ val path = carbonTable.getMetaDataFilepath
+ val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
if (loadMetadataDetailsArray.nonEmpty) {
val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0c13293..25c36c5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.hive
-
import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
import scala.collection.JavaConverters._
@@ -44,12 +43,9 @@ object DistributionUtil {
* localhost for retriving executor list
*/
def getNodeList(sparkContext: SparkContext): Array[String] = {
-
- val arr =
- sparkContext.getExecutorMemoryStatus.map {
- kv =>
- kv._1.split(":")(0)
- }.toSeq
+ val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+ kv._1.split(":")(0)
+ }.toSeq
val localhostIPs = getLocalhostIPs
val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
@@ -109,10 +105,9 @@ object DistributionUtil {
* @param sparkContext
* @return
*/
- def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
- sparkContext: SparkContext):
- Array[String] = {
- val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+ sparkContext: SparkContext): Seq[String] = {
+ val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
var confExecutorsTemp: String = null
if (sparkContext.getConf.contains("spark.executor.instances")) {
confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -131,7 +126,9 @@ object DistributionUtil {
}
val requiredExecutors = if (nodeMapping.size > confExecutors) {
confExecutors
- } else { nodeMapping.size() }
+ } else {
+ nodeMapping.size()
+ }
val startTime = System.currentTimeMillis()
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index cc00c47..f02d4e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -35,10 +35,9 @@ import org.apache.carbondata.core.util.CarbonProperties
class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
-
+ clean
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
.getCanonicalPath
-
sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -52,11 +51,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
}
+ def clean{
+ sql("drop table if exists Carbon_automation_test")
+ sql("drop table if exists Carbon_automation_hive")
+ sql("drop table if exists Carbon_automation_test_hive")
+ }
+
override def afterAll {
- sql("drop table Carbon_automation_test")
- sql("drop table Carbon_automation_hive")
- sql("drop table Carbon_automation_test_hive")
-
+ clean
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
@@ -425,10 +427,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
})
//TC_103
- test("select variance(deviceInformationId) as a from Carbon_automation_test")({
+ test("select variance(deviceInformationId) as a from carbon_automation_test")({
checkAnswer(
- sql("select variance(deviceInformationId) as a from Carbon_automation_test"),
- sql("select variance(deviceInformationId) as a from Carbon_automation_hive"))
+ sql("select variance(deviceInformationId) as a from carbon_automation_test"),
+ sql("select variance(deviceInformationId) as a from carbon_automation_hive"))
})
//TC_105
test("select var_samp(deviceInformationId) as a from Carbon_automation_test")({
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index 7343a81..924d91a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -113,26 +113,22 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
sql("clean files for table table2")
// check for table 1.
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier1 = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")
- )
- )
+ new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr"))
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier1)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0"))
assert(!segments.contains("1"))
// check for table 2.
- val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier2 = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")
- )
- )
+ new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1"))
// merged segment should not be there
- val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments2 = SegmentStatusManager.getSegmentStatus(identifier2)
+ .getValidSegments.asScala.toList
assert(segments2.contains("0.1"))
assert(!segments2.contains("0"))
assert(!segments2.contains("1"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 80a2320..f5039a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -43,9 +43,6 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter
val carbonTableIdentifier: CarbonTableIdentifier =
new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
-
override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index d780efe..f77ec9b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -86,13 +86,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
var noOfRetries = 0
while (status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
+ new CarbonTableIdentifier(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index eb889d6..7ec6431 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -105,16 +105,11 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
* Compaction should fail as lock is being held purposefully
*/
test("check if compaction is failed or not.") {
-
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
- absoluteTableIdentifier
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-
+ val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
assert(true)
- }
- else {
+ } else {
assert(false)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
index fbb39d8..15ed78b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
@@ -45,8 +45,7 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
val carbonTableIdentifier: CarbonTableIdentifier =
new CarbonTableIdentifier("default", "minorthreshold".toLowerCase(), "1")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
+ val identifier = new AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)
override def beforeAll {
CarbonProperties.getInstance()
@@ -96,7 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
sql("clean files for table minorthreshold")
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.2"))
assert(!segments.contains("0.1"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index c7be22f..570bb72 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -38,14 +38,10 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
// return segment details
def getSegments(databaseName : String, tableName : String, tableId : String): List[String] = {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
- )
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
- segments
+ new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId))
+ SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList
}
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 3eef8b7..137cebc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -84,13 +84,12 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
var noOfRetries = 0
while (status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
@@ -131,15 +130,14 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
// delete merged segments
sql("clean files for table normalcompaction")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(!segments.contains("0"))
assert(!segments.contains("1"))
assert(!segments.contains("2"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 99e3d56..a1664a6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -97,14 +97,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
var noOfRetries = 0
while (!status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -129,15 +128,14 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
// delete merged segments
sql("clean files for table ignoremajor")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(segments.contains("2.1"))
assert(!segments.contains("2"))
@@ -156,13 +154,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
catch {
case _:Throwable => assert(true)
}
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- )
val carbontablePath = CarbonStorePath
.getCarbonTablePath(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -170,7 +161,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
.getMetadataDirectoryPath
- var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+ val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted.
assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
@@ -185,13 +176,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
"DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" +
" '2222-01-01 19:35:01'"
)
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- )
val carbontablePath = CarbonStorePath
.getCarbonTablePath(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -199,7 +183,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
.getMetadataDirectoryPath
- var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+ val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted for segment 2.
assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3745e11..25087a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -87,14 +87,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
var noOfRetries = 0
while (!status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -119,14 +118,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
// delete merged segments
sql("clean files for table stopmajor")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0.2"))
assert(!segments.contains("0"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index bed6428..6d3cdec 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -55,7 +55,6 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
AbsoluteTableIdentifier(storeLocation,
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "DataRetentionTable".toLowerCase(), "300"))
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention)
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath,
absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath
@@ -133,8 +132,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
}
test("RetentionTest_DeleteSegmentsByLoadTime") {
- val segments: Array[LoadMetadataDetails] = segmentStatusManager
- .readLoadMetadata(carbonTablePath)
+ val segments: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(carbonTablePath)
// check segment length, it should be 3 (loads)
if (segments.length != 2) {
assert(false)