You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/25 10:21:02 UTC

[2/4] incubator-carbondata git commit: change ScanRdd to use RecordReader

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7798e5c..e8d7399 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -17,49 +17,43 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.text.SimpleDateFormat
 import java.util
+import java.util.Date
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hive.DistributionUtil
 
-import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.scan.executor.QueryExecutor
-import org.apache.carbondata.scan.executor.QueryExecutorFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
 import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.scan.model.QueryModel
-import org.apache.carbondata.scan.result.BatchResult
-import org.apache.carbondata.scan.result.iterator.ChunkRowIterator
-import org.apache.carbondata.spark.RawValue
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
-
-class CarbonSparkPartition(rddId: Int, val idx: Int,
-    val locations: Array[String],
-    val tableBlockInfos: util.List[TableBlockInfo])
+class CarbonSparkPartition(
+    val rddId: Int,
+    val idx: Int,
+    @transient val multiBlockSplit: CarbonMultiBlockSplit)
   extends Partition {
 
+  val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
   override val index: Int = idx
 
-  // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
-  override def hashCode(): Int = {
-    41 * (41 + rddId) + idx
-  }
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
 /**
@@ -68,169 +62,135 @@ class CarbonSparkPartition(rddId: Int, val idx: Int,
  * level filtering in driver side.
  */
 class CarbonScanRDD[V: ClassTag](
-    sc: SparkContext,
-    queryModel: QueryModel,
+    @transient sc: SparkContext,
+    columnProjection: Seq[Attribute],
     filterExpression: Expression,
-    keyClass: RawValue[V],
-    @transient conf: Configuration,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
-    baseStoreLocation: String)
-  extends RDD[V](sc, Nil) {
+    identifier: AbsoluteTableIdentifier,
+    @transient carbonTable: CarbonTable)
+  extends RDD[V](sc, Nil)
+    with CarbonHadoopMapReduceUtil
+    with Logging {
+
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
 
+  @transient private val jobId = new JobID(jobTrackerId, id)
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def getPartitions: Array[Partition] = {
-    var defaultParallelism = sparkContext.defaultParallelism
-    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
-      QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+    val job = Job.getInstance(new Configuration())
+    val format = prepareInputFormatForDriver(job.getConfiguration)
 
     // initialise query_id for job
-    job.getConfiguration.set("query.id", queryModel.getQueryId)
-
-    val result = new util.ArrayList[Partition](defaultParallelism)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
-      .getValidAndInvalidSegments
-    // set filter resolver tree
-    try {
-      // before applying filter check whether segments are available in the table.
-      if (!validAndInvalidSegments.getValidSegments.isEmpty) {
-        val filterResolver = carbonInputFormat
-          .getResolvedFilter(job.getConfiguration, filterExpression)
-        CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
-        queryModel.setFilterExpressionResolverTree(filterResolver)
-        CarbonInputFormat
-          .setSegmentsToAccess(job.getConfiguration,
-            validAndInvalidSegments.getValidSegments
-          )
-        SegmentTaskIndexStore.getInstance()
-          .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
-            queryModel.getAbsoluteTableIdentifier
-          )
-      }
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e)
-        sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
-    }
+    job.getConfiguration.set("query.id", queryId)
+
     // get splits
-    val splits = carbonInputFormat.getSplits(job)
+    val splits = format.getSplits(job)
+    val result = distributeSplits(splits)
+    result
+  }
+
+  private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+    // this function distributes the split based on following logic:
+    // 1. based on data locality, to make split balanced on all available nodes
+    // 2. if the number of split for one
+
+    var statistic = new QueryStatistic()
+    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val parallelism = sparkContext.defaultParallelism
+    val result = new util.ArrayList[Partition](parallelism)
+    var noOfBlocks = 0
+    var noOfNodes = 0
+    var noOfTasks = 0
+
     if (!splits.isEmpty) {
-      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-      queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
-      val blockListTemp = carbonInputSplits.map(inputSplit =>
-        new TableBlockInfo(inputSplit.getPath.toString,
-          inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength,
-          new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
-        )
-      )
-      var activeNodes = Array[String]()
-      if (blockListTemp.nonEmpty) {
-        activeNodes = DistributionUtil
-          .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
-      }
-      defaultParallelism = sparkContext.defaultParallelism
-      val blockList = CarbonLoaderUtil.
-        distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
-
-      if (blockList.nonEmpty) {
-        var statistic = new QueryStatistic()
-        // group blocks to nodes, tasks
-        val nodeBlockMapping =
-        CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
-          activeNodes.toList.asJava
-        )
-        statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
-        statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
-        statistic = new QueryStatistic()
-        var i = 0
-        // Create Spark Partition for each task and assign blocks
-        nodeBlockMapping.asScala.foreach { entry =>
-          entry._2.asScala.foreach { blocksPerTask => {
-            val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
-            if (blocksPerTask.size() != 0) {
-              result
-                .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
-              i += 1
-            }
+      // create a list of block based on split
+      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+      // get the list of executors and map blocks to executors based on locality
+      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+      // divide the blocks among the tasks of the nodes as per the data locality
+      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+        parallelism, activeNodes.toList.asJava)
+
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statistic = new QueryStatistic()
+
+      var i = 0
+      // Create Spark Partition for each task and assign blocks
+      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+        blockList.asScala.foreach { blocksPerTask =>
+          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          if (blocksPerTask.size() != 0) {
+            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+            result.add(partition)
+            i += 1
           }
-          }
-        }
-        val noOfBlocks = blockList.size
-        val noOfNodes = nodeBlockMapping.size
-        val noOfTasks = result.size()
-        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
-                + s"parallelism: $defaultParallelism , " +
-                s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
-        )
-        statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
-          System.currentTimeMillis)
-        statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
-        statisticRecorder.logStatisticsAsTableDriver()
-        result.asScala.foreach { r =>
-          val cp = r.asInstanceOf[CarbonSparkPartition]
-          logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
-                  s", No.Of Blocks:  ${ cp.tableBlockInfos.size() }"
-          )
         }
-      } else {
-        logInfo("No blocks identified to scan")
       }
-    } else {
-      logInfo("No valid segments found to scan")
+
+      noOfBlocks = splits.size
+      noOfNodes = nodeBlockMapping.size
+      noOfTasks = result.size()
+
+      statistic = new QueryStatistic()
+      statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+        System.currentTimeMillis)
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+      statisticRecorder.logStatisticsAsTableDriver()
     }
+    logInfo(
+      s"""
+         | Identified no.of.blocks: $noOfBlocks,
+         | no.of.tasks: $noOfTasks,
+         | no.of.nodes: $noOfNodes,
+         | parallelism: $parallelism
+       """.stripMargin)
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[V] {
-      var rowIterator: CarbonIterator[Array[Any]] = _
-      var queryStartTime: Long = 0
-      val queryExecutor = QueryExecutorFactory.getQueryExecutor()
-      try {
-        context.addTaskCompletionListener(context => {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-          logStatistics()
-          queryExecutor.finish
-        })
-        val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
-        if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
-          queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
-          // fill table block info
-          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-          queryStartTime = System.currentTimeMillis
-          val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-          logInfo("*************************" + carbonPropertiesFilePath)
-          if (null == carbonPropertiesFilePath) {
-            System.setProperty("carbon.properties.filepath",
-              System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-          }
-          // execute query
-          rowIterator = new ChunkRowIterator(
-            queryExecutor.execute(queryModel).
-              asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+  override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+    if (null == carbonPropertiesFilePath) {
+      System.setProperty("carbon.properties.filepath",
+        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+      )
+    }
 
-        }
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e)
-          if (null != e.getMessage) {
-            sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
-          } else {
-            sys.error("Exception occurred in query execution.Please check logs.")
-          }
-      }
+    val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+    val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
+    val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    val reader = format.createRecordReader(inputSplit, attemptContext)
+    reader.initialize(inputSplit, attemptContext)
+
+    val queryStartTime = System.currentTimeMillis
 
-      var havePair = false
-      var finished = false
-      var recordCount = 0
+    new Iterator[V] {
+      private var havePair = false
+      private var finished = false
+      private var count = 0
+
+      context.addTaskCompletionListener { context =>
+        logStatistics(queryStartTime, count)
+        reader.close()
+      }
 
       override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
         if (!finished && !havePair) {
-          finished = (null == rowIterator) || (!rowIterator.hasNext)
+          finished = !reader.nextKeyValue
+          if (finished) {
+            reader.close()
+          }
           havePair = !finished
         }
         !finished
@@ -241,68 +201,55 @@ class CarbonScanRDD[V: ClassTag](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        recordCount += 1
-        keyClass.getValue(rowIterator.next())
+        val value: V = reader.getCurrentValue
+        count += 1
+        value
       }
+    }
+  }
 
-      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
-        if (null != columnToDictionaryMap) {
-          org.apache.carbondata.spark.util.CarbonQueryUtil
-            .clearColumnDictionaryCache(columnToDictionaryMap)
-        }
-      }
+  private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonTable(conf, carbonTable)
+    createInputFormat(conf)
+  }
 
-      def logStatistics(): Unit = {
-        if (null != queryModel.getStatisticsRecorder) {
-          var queryStatistic = new QueryStatistic()
-          queryStatistic
-            .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-              System.currentTimeMillis - queryStartTime
-            )
-          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-          // result size
-          queryStatistic = new QueryStatistic()
-          queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
-          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-          // print executor query statistics for each task_id
-          queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
-        }
-      }
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+    CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
+    createInputFormat(conf)
+  }
+
+  private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+    val format = new CarbonInputFormat[V]
+    CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    val projection = new CarbonProjection
+    columnProjection.foreach { attr =>
+      projection.addColumn(attr.name)
     }
+    CarbonInputFormat.setColumnProjection(conf, projection)
+    format
+  }
 
-    iter
+  def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+    var queryStatistic = new QueryStatistic()
+    queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+      System.currentTimeMillis - queryStartTime)
+    val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // result size
+    queryStatistic = new QueryStatistic()
+    queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+    statisticRecorder.recordStatistics(queryStatistic)
+    // print executor query statistics for each task_id
+    statisticRecorder.logStatisticsAsTableExecutor()
   }
 
   /**
    * Get the preferred locations where to launch this task.
    */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-    val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
-    val tableBlocks = theSplit.tableBlockInfos
-    // node name and count mapping
-    val blockMap = new util.LinkedHashMap[String, Integer]()
-
-    tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
-      location => {
-        if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-          val currentCount = blockMap.get(location)
-          if (currentCount == null) {
-            blockMap.put(location, 1)
-          } else {
-            blockMap.put(location, currentCount + 1)
-          }
-        }
-      }
-    )
-    )
-
-    val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-      nodeCount1.getValue > nodeCount2.getValue
-    }
-    )
-
-    val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-    firstOptionLocation ++ sortedNodesList
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonSparkPartition]
+    val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+    firstOptionLocation
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 9c9be8d..5fdbc5d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -73,17 +73,8 @@ object Compactor {
       maxSegmentColumnSchemaList = null
     )
     carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
-    val segmentStatusManager = new SegmentStatusManager(new AbsoluteTableIdentifier
-    (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
-      new CarbonTableIdentifier(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName,
-        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId
-      )
-    )
-    )
-    carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
-      .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
-    )
+    carbonLoadModel.setLoadMetadataDetails(
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
     var execInstance = "1"
     // in case of non dynamic executor allocation, number of executors are fixed.
     if (sc.sparkContext.getConf.contains("spark.executor.instances")) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index c55c807..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
-  /**
-   * createCarbonInputFormat from query model
-   */
-  def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = new Job(jobConf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    (carbonInputFormat, job)
-  }
-
-  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonInputFormat[V] = {
-    val carbonInputFormat = new CarbonInputFormat[V]()
-    val job: Job = new Job(conf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    carbonInputFormat
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index c9d2a0f..ca0ad58 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,14 +38,13 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.scan.expression.logical.AndExpression
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
 private[sql] case class CarbonDatasourceHadoopRelation(
@@ -104,7 +103,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-    CarbonInputFormat.setColumnProjection(projection, conf)
+    CarbonInputFormat.setColumnProjection(conf, projection)
     CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
 
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
@@ -145,12 +144,11 @@ class CarbonHadoopFSRDD[V: ClassTag](
     context: TaskContext): Iterator[V] = {
     val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
-    val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
-      hadoopAttemptContext.getConfiguration
-    )
+    val job: Job = new Job(hadoopAttemptContext.getConfiguration)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
     hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
     val reader =
-      inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+      format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
         hadoopAttemptContext
       )
     reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
@@ -186,11 +184,9 @@ class CarbonHadoopFSRDD[V: ClassTag](
 
   override protected def getPartitions: Array[Partition] = {
     val jobContext = newJobContext(conf.value, jobId)
-    val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
-      jobContext.getConfiguration
-    )
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
     jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
-    val splits = carbonInputFormat.getSplits(jobContext).toArray
+    val splits = format.getSplits(jobContext).toArray
     val carbonInputSplits = splits
       .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
     carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 069e106..a06d5cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -272,9 +272,8 @@ case class CarbonRelation(
   private var sizeInBytesLocalValue = 0L
 
   def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
+    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
       tableMeta.carbonTable.getAbsoluteTableIdentifier)
-        .getTableStatusLastModifiedTime
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
         tableMeta.storePath,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
deleted file mode 100644
index c105cae..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.scan.model._
-import org.apache.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
-    var attributesRaw: Seq[Attribute],
-    relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression],
-    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
-  val carbonTable = relationRaw.metaData.carbonTable
-  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
-  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
-  val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
-  val unprocessedExprs = new ArrayBuffer[Expression]()
-
-  val buildCarbonPlan: CarbonQueryPlan = {
-    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
-
-    plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
-    plan.setOutLocationPath(
-      CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
-    plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-    processFilterExpressions(plan)
-    plan
-  }
-
-  def processFilterExpressions(plan: CarbonQueryPlan) {
-    if (dimensionPredicatesRaw.nonEmpty) {
-      val expressionVal = CarbonFilters.processExpression(
-        dimensionPredicatesRaw,
-        attributesNeedToDecode,
-        unprocessedExprs,
-        carbonTable)
-      expressionVal match {
-        case Some(ce) =>
-          // adding dimension used in expression in querystats
-          plan.setFilterExpression(ce)
-        case _ =>
-      }
-    }
-    processExtraAttributes(plan)
-  }
-
-  private def processExtraAttributes(plan: CarbonQueryPlan) {
-    if (attributesNeedToDecode.size() > 0) {
-      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
-      attributesNeedToDecode.asScala.foreach { attr =>
-        if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
-          attributeOut += attr
-        }
-      }
-      attributesRaw = attributeOut
-    }
-
-    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-    val dimAttr = new Array[Attribute](dimensions.size())
-    val msrAttr = new Array[Attribute](measures.size())
-    attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if(carbonDimension != null) {
-        dimAttr(dimensions.indexOf(carbonDimension)) = attr
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if(carbonMeasure != null) {
-          msrAttr(measures.indexOf(carbonMeasure)) = attr
-        }
-      }
-    }
-
-    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
-    var queryOrder: Integer = 0
-    attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if (carbonDimension != null) {
-        val dim = new QueryDimension(attr.name)
-        dim.setQueryOrder(queryOrder)
-        queryOrder = queryOrder + 1
-        selectedDims += dim
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if (carbonMeasure != null) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setQueryOrder(queryOrder)
-          queryOrder = queryOrder + 1
-          selectedMsrs += m1
-        }
-      }
-    }
-
-    // Fill the selected dimensions & measures obtained from
-    // attributes to query plan  for detailed query
-    selectedDims.foreach(plan.addDimension)
-    selectedMsrs.foreach(plan.addMeasure)
-  }
-
-
-  def inputRdd: CarbonScanRDD[Array[Any]] = {
-
-    val conf = new Configuration()
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val model = QueryModel.createModel(
-      absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: RawValue[Array[Any]] = new RawValueImpl
-    // setting queryid
-    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
-    val tableCreationTime = carbonCatalog
-        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
-    val schemaLastUpdatedTime = carbonCatalog
-        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
-    val big = new CarbonScanRDD(
-      ocRaw.sparkContext,
-      model,
-      buildCarbonPlan.getFilterExpression,
-      kv,
-      conf,
-      tableCreationTime,
-      schemaLastUpdatedTime,
-      carbonCatalog.storePath)
-    big
-  }
-
-
-  override def outputsUnsafeRows: Boolean =
-    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
-  override def doExecute(): RDD[InternalRow] = {
-    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-    inputRdd.mapPartitions { iter =>
-      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-      new Iterator[InternalRow] {
-        override def hasNext: Boolean = iter.hasNext
-
-        override def next(): InternalRow =
-          if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(iter.next()))
-          } else {
-            new GenericMutableRow(iter.next())
-          }
-      }
-    }
-  }
-
-  def output: Seq[Attribute] = {
-    attributesRaw
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
new file mode 100644
index 0000000..6580c4f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.LeafNode
+import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.scan.model._
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+    var attributesRaw: Seq[Attribute],
+    relationRaw: CarbonRelation,
+    dimensionPredicatesRaw: Seq[Expression],
+    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
+  val carbonTable = relationRaw.metaData.carbonTable
+  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
+  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
+  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+
+  val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
+  val unprocessedExprs = new ArrayBuffer[Expression]()
+
+  val buildCarbonPlan: CarbonQueryPlan = {
+    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
+
+    plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+    plan.setOutLocationPath(
+      CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
+    plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+    processFilterExpressions(plan)
+    plan
+  }
+
+  def processFilterExpressions(plan: CarbonQueryPlan) {
+    if (dimensionPredicatesRaw.nonEmpty) {
+      val expressionVal = CarbonFilters.processExpression(
+        dimensionPredicatesRaw,
+        attributesNeedToDecode,
+        unprocessedExprs,
+        carbonTable)
+      expressionVal match {
+        case Some(ce) =>
+          // adding dimension used in expression in querystats
+          plan.setFilterExpression(ce)
+        case _ =>
+      }
+    }
+    processExtraAttributes(plan)
+  }
+
+  private def processExtraAttributes(plan: CarbonQueryPlan) {
+    if (attributesNeedToDecode.size() > 0) {
+      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+      attributesNeedToDecode.asScala.foreach { attr =>
+        if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+          attributeOut += attr
+        }
+      }
+      attributesRaw = attributeOut
+    }
+
+    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+    val dimAttr = new Array[Attribute](dimensions.size())
+    val msrAttr = new Array[Attribute](measures.size())
+    attributesRaw.foreach { attr =>
+      val carbonDimension =
+        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+      if(carbonDimension != null) {
+        dimAttr(dimensions.indexOf(carbonDimension)) = attr
+      } else {
+        val carbonMeasure =
+          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+        if(carbonMeasure != null) {
+          msrAttr(measures.indexOf(carbonMeasure)) = attr
+        }
+      }
+    }
+
+    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+
+    var queryOrder: Integer = 0
+    attributesRaw.foreach { attr =>
+      val carbonDimension =
+        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+      if (carbonDimension != null) {
+        val dim = new QueryDimension(attr.name)
+        dim.setQueryOrder(queryOrder)
+        queryOrder = queryOrder + 1
+        selectedDims += dim
+      } else {
+        val carbonMeasure =
+          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+        if (carbonMeasure != null) {
+          val m1 = new QueryMeasure(attr.name)
+          m1.setQueryOrder(queryOrder)
+          queryOrder = queryOrder + 1
+          selectedMsrs += m1
+        }
+      }
+    }
+
+    // Fill the selected dimensions & measures obtained from
+    // attributes to query plan  for detailed query
+    selectedDims.foreach(plan.addDimension)
+    selectedMsrs.foreach(plan.addMeasure)
+  }
+
+  def inputRdd: CarbonScanRDD[Array[Any]] = {
+
+    val conf = new Configuration()
+    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+    // setting queryid
+    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+    val tableCreationTime = carbonCatalog
+        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+    val schemaLastUpdatedTime = carbonCatalog
+        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+    new CarbonScanRDD(
+      ocRaw.sparkContext,
+      attributesRaw,
+      buildCarbonPlan.getFilterExpression,
+      absoluteTableIdentifier,
+      carbonTable
+    )
+  }
+
+  override def outputsUnsafeRows: Boolean =
+    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+
+  override def doExecute(): RDD[InternalRow] = {
+    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+    inputRdd.mapPartitions { iter =>
+      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = iter.hasNext
+
+        override def next(): InternalRow = {
+          val value = iter.next
+          if (outUnsafeRows) {
+            unsafeProjection(new GenericMutableRow(value))
+          } else {
+            new GenericMutableRow(value)
+          }
+        }
+      }
+    }
+  }
+
+  def output: Seq[Attribute] = {
+    attributesRaw
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a6b4ec5..74b0dd2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -924,10 +924,9 @@ private[sql] case class DeleteLoadsById(
     }
     val path = carbonTable.getMetaDataFilepath
 
-    val segmentStatusManager =
-      new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
     try {
-      val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
+      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
 
       if (invalidLoadIds.isEmpty) {
 
@@ -986,8 +985,6 @@ private[sql] case class DeleteLoadsByLoadDate(
 
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
       .getCarbonTable(dbName + '_' + tableName)
-    val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
     if (null == carbonTable) {
       var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
         .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
@@ -995,8 +992,9 @@ private[sql] case class DeleteLoadsByLoadDate(
     val path = carbonTable.getMetaDataFilepath()
 
     try {
-      val invalidLoadTimestamps = segmentStatusManager
-        .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+      val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadDate, path,
+        timeObj.asInstanceOf[java.lang.Long]).asScala
       if (invalidLoadTimestamps.isEmpty) {
         LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
       }
@@ -1328,12 +1326,8 @@ private[sql] case class ShowLoads(
     if (carbonTable == null) {
       sys.error(s"$databaseName.$tableName is not found")
     }
-    val path = carbonTable.getMetaDataFilepath()
-
-    val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
-    val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(path)
-
+    val path = carbonTable.getMetaDataFilepath
+    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
     if (loadMetadataDetailsArray.nonEmpty) {
 
       val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0c13293..25c36c5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.hive
 
-
 import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
 
 import scala.collection.JavaConverters._
@@ -44,12 +43,9 @@ object DistributionUtil {
    * localhost for retriving executor list
    */
   def getNodeList(sparkContext: SparkContext): Array[String] = {
-
-    val arr =
-      sparkContext.getExecutorMemoryStatus.map {
-        kv =>
-          kv._1.split(":")(0)
-      }.toSeq
+    val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+      kv._1.split(":")(0)
+    }.toSeq
     val localhostIPs = getLocalhostIPs
     val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
 
@@ -109,10 +105,9 @@ object DistributionUtil {
    * @param sparkContext
    * @return
    */
-  def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
-      sparkContext: SparkContext):
-  Array[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+  def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+    sparkContext: SparkContext): Seq[String] = {
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
     var confExecutorsTemp: String = null
     if (sparkContext.getConf.contains("spark.executor.instances")) {
       confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -131,7 +126,9 @@ object DistributionUtil {
     }
     val requiredExecutors = if (nodeMapping.size > confExecutors) {
       confExecutors
-    } else { nodeMapping.size() }
+    } else {
+      nodeMapping.size()
+    }
 
     val startTime = System.currentTimeMillis()
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index cc00c47..f02d4e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -35,10 +35,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-
+    clean
     val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
       .getCanonicalPath
-
     sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
 _phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -52,11 +51,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  def clean{
+    sql("drop table if exists Carbon_automation_test")
+    sql("drop table if exists Carbon_automation_hive")
+    sql("drop table if exists Carbon_automation_test_hive")
+  }
+  
   override def afterAll {
-    sql("drop table Carbon_automation_test")
-    sql("drop table Carbon_automation_hive")
-    sql("drop table Carbon_automation_test_hive")
-
+    clean
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }
@@ -425,10 +427,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
   })
 
   //TC_103
-  test("select variance(deviceInformationId) as a   from Carbon_automation_test")({
+  test("select variance(deviceInformationId) as a   from carbon_automation_test")({
     checkAnswer(
-      sql("select variance(deviceInformationId) as a   from Carbon_automation_test"),
-      sql("select variance(deviceInformationId) as a   from Carbon_automation_hive"))
+      sql("select variance(deviceInformationId) as a   from carbon_automation_test"),
+      sql("select variance(deviceInformationId) as a   from carbon_automation_hive"))
   })
    //TC_105
   test("select var_samp(deviceInformationId) as a  from Carbon_automation_test")({

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index 7343a81..924d91a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -113,26 +113,22 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
     sql("clean files for table table2")
 
     // check for table 1.
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier1 = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")
-        )
-    )
+          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr"))
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier1)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     // check for table 2.
-    val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier2 = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")
-        )
-    )
+          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1"))
     // merged segment should not be there
-    val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments2 = SegmentStatusManager.getSegmentStatus(identifier2)
+        .getValidSegments.asScala.toList
     assert(segments2.contains("0.1"))
     assert(!segments2.contains("0"))
     assert(!segments2.contains("1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 80a2320..f5039a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -43,9 +43,6 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter
   val carbonTableIdentifier: CarbonTableIdentifier =
     new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1")
 
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-      AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
-
   override def beforeAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index d780efe..f77ec9b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -86,13 +86,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
+            new CarbonTableIdentifier(
+              CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index eb889d6..7ec6431 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -105,16 +105,11 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
     * Compaction should fail as lock is being held purposefully
     */
   test("check if compaction is failed or not.") {
-
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
-        absoluteTableIdentifier
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-
+      val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier)
+          .getValidSegments.asScala.toList
       if (!segments.contains("0.1")) {
         assert(true)
-      }
-      else {
+      } else {
         assert(false)
       }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
index fbb39d8..15ed78b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
@@ -45,8 +45,7 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
   val carbonTableIdentifier: CarbonTableIdentifier =
     new CarbonTableIdentifier("default", "minorthreshold".toLowerCase(), "1")
 
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-      AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
+  val identifier = new AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)
 
   override def beforeAll {
     CarbonProperties.getInstance()
@@ -96,7 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
 
     sql("clean files for table minorthreshold")
 
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
 
     assert(segments.contains("0.2"))
     assert(!segments.contains("0.1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index c7be22f..570bb72 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -38,14 +38,10 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
 
   // return segment details
   def getSegments(databaseName : String, tableName : String, tableId : String): List[String] = {
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
-        )
-    )
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-    segments
+          new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId))
+    SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList
   }
 
   val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 3eef8b7..137cebc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -84,13 +84,12 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.
@@ -131,15 +130,14 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
     // delete merged segments
     sql("clean files for table normalcompaction")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid")
         )
-    )
     // merged segment should not be there
-    val segments   = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 99e3d56..a1664a6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -97,14 +97,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -129,15 +128,14 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     // delete merged segments
     sql("clean files for table ignoremajor")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
         )
-    )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
@@ -156,13 +154,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     catch {
       case _:Throwable => assert(true)
     }
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(
-            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-        )
-    )
     val carbontablePath = CarbonStorePath
       .getCarbonTablePath(CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -170,7 +161,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
       )
       .getMetadataDirectoryPath
-    var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted.
     assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
@@ -185,13 +176,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
       "DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" +
         " '2222-01-01 19:35:01'"
     )
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
-          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-          new CarbonTableIdentifier(
-            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
-        )
-    )
     val carbontablePath = CarbonStorePath
       .getCarbonTablePath(CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -199,7 +183,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
       )
       .getMetadataDirectoryPath
-    var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.
     assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3745e11..25087a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -87,14 +87,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     var noOfRetries = 0
     while (!status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val identifier = new AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
           )
-      )
-      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      val segments = SegmentStatusManager.getSegmentStatus(identifier)
+          .getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -119,14 +118,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     // delete merged segments
     sql("clean files for table stopmajor")
 
-    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-        AbsoluteTableIdentifier(
+    val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
         )
-    )
     // merged segment should not be there
-    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    val segments = SegmentStatusManager.getSegmentStatus(identifier)
+        .getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0.2"))
     assert(!segments.contains("0"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index bed6428..6d3cdec 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -55,7 +55,6 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
       AbsoluteTableIdentifier(storeLocation,
         new CarbonTableIdentifier(
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "DataRetentionTable".toLowerCase(), "300"))
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention)
   val carbonTablePath = CarbonStorePath
     .getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath,
       absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath
@@ -133,8 +132,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("RetentionTest_DeleteSegmentsByLoadTime") {
-    val segments: Array[LoadMetadataDetails] = segmentStatusManager
-      .readLoadMetadata(carbonTablePath)
+    val segments: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(carbonTablePath)
     // check segment length, it should be 3 (loads)
     if (segments.length != 2) {
       assert(false)