You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/12/11 01:46:07 UTC

[carbondata] branch master updated: [HOTFIX] Refact Carbon Util

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new afb7626  [HOTFIX] Refact Carbon Util
afb7626 is described below

commit afb7626e6c4c31c93795eebcbcc337e83e595fb3
Author: Zhangshunyu <zh...@126.com>
AuthorDate: Fri Nov 27 11:17:55 2020 +0800

    [HOTFIX] Refact Carbon Util
    
    Why is this PR needed?
    Currentlly, we have some Carbon{$FUNCTION_NAME}Util, for example CarbonSparkUtil, CarbonQueryUtil, CarbonMergeUtil, CarbonLoadUtil, and we also have some CarbonUtil/CarbonUtils which has some mixed functions in, we should clean code and move the functions in CarbonUtils to the specified Util where it should be.
    
    What changes were proposed in this PR?
    Refact the code to clean it
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4029
---
 .../carbondata/benchmark/ConcurrentQueryBenchmark.scala    |  2 +-
 .../org/apache/carbondata/examples/S3CsvExample.scala      |  2 +-
 .../apache/carbondata/spark/rdd/CarbonTableCompactor.scala |  6 +++---
 .../org/apache/carbondata/spark/util/CarbonSparkUtil.scala | 10 ++++++++++
 .../org/apache/carbondata/view/MVManagerInSpark.scala      |  6 +++---
 .../scala/org/apache/carbondata/view/MVRefresher.scala     |  6 +++---
 .../apache/spark/sql/CarbonDatasourceHadoopRelation.scala  |  2 +-
 .../sql/{CarbonUtils.scala => CarbonThreadUtil.scala}      | 10 +---------
 .../sql/execution/command/management/CommonLoadUtils.scala |  4 ++--
 .../command/mutation/merge/CarbonMergeDataSetCommand.scala |  5 ++---
 .../apache/spark/sql/parser/CarbonExtensionSqlParser.scala |  4 ++--
 .../org/apache/spark/sql/parser/CarbonSparkSqlParser.scala |  4 ++--
 .../optimizer/CarbonSITransformationRule.scala             |  5 +++--
 .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala     |  6 +++---
 .../TestSegmentReadingForMultiThreading.scala              | 14 +++++++-------
 .../spark/carbondata/query/TestFilterReordering.scala      |  6 +++---
 16 files changed, 47 insertions(+), 45 deletions(-)

diff --git a/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
index d1ca452..d49940c 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
@@ -513,7 +513,7 @@ object ConcurrentQueryBenchmark {
       .addProperty("carbon.blockletgroup.size.in.mb", "32")
       .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
       .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "false")
-    import org.apache.spark.sql.CarbonUtils._
+    import org.apache.spark.sql.CarbonThreadUtil._
 
     // 1. initParameters
     initParameters(args)
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
index f9a5b90..01345b3 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
@@ -35,7 +35,7 @@ object S3CsvExample {
                             + "../../../..").getCanonicalPath
     val logger: Logger = LoggerFactory.getLogger(this.getClass)
 
-    import org.apache.spark.sql.CarbonUtils._
+    import org.apache.spark.sql.CarbonThreadUtil._
     if (args.length != 4) {
       logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
                    "<s3.csv.location> <spark-master>")
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 9e1369b..a381089 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
+import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
 import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -433,7 +433,7 @@ class CarbonTableCompactor(
       carbonMergerMapping.validSegments)
     var loadResult: Array[(String, Boolean)] = null
     try {
-      CarbonUtils
+      CarbonThreadUtil
         .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
           table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName,
           splits.asScala.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
@@ -458,7 +458,7 @@ class CarbonTableCompactor(
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
     } finally {
-      CarbonUtils
+      CarbonThreadUtil
         .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
           table.getDatabaseName + "." +
           table.getTableName)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index d3607c1..53e5e5c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -26,6 +26,9 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.types._
 
@@ -47,6 +50,13 @@ object CarbonSparkUtil {
       table)
   }
 
+  def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDatasourceHadoopRelation] = {
+    plan collect {
+      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    }
+  }
+
   /**
    * return's the formatted column comment if column comment is present else empty("")
    *
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index ebdc3e8..676c135 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -21,7 +21,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory, MVManager,
 
 class MVManagerInSpark(session: SparkSession) extends MVManager {
   override def getDatabases: util.List[String] = {
-    CarbonUtils.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true")
+    CarbonThreadUtil.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true")
     try {
       val databaseList = session.catalog.listDatabases()
       val databaseNameList = new util.ArrayList[String]()
@@ -38,7 +38,7 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
       }
       databaseNameList
     } finally {
-      CarbonUtils.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV)
+      CarbonThreadUtil.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV)
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index 152fbb4..31b5548 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 
 import com.google.gson.Gson
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonThreadUtil, SparkSession}
 import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.parser.MVQueryParser
 
@@ -372,7 +372,7 @@ object MVRefresher {
    */
   private def setInputSegments(tableUniqueName: String,
       mainTableSegmentList: java.util.List[String]): Unit = {
-    CarbonUtils
+    CarbonThreadUtil
       .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
                  tableUniqueName, mainTableSegmentList.asScala.mkString(","))
   }
@@ -380,7 +380,7 @@ object MVRefresher {
   private def unsetInputSegments(schema: MVSchema): Unit = {
     val relatedTableIdentifiers = schema.getRelatedTables
     for (relationIdentifier <- relatedTableIdentifiers.asScala) {
-      CarbonUtils
+      CarbonThreadUtil
         .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
                      relationIdentifier.getDatabaseName + "." +
                      relationIdentifier.getTableName)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 10335d9..a1273af 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -55,7 +55,7 @@ case class CarbonDatasourceHadoopRelation(
     FileFactory.getUpdatedFilePath(paths.head),
     CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession),
     caseInsensitiveMap("tablename"))
-  CarbonUtils.updateSessionInfoToCurrentThread(sparkSession)
+  CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession)
 
   @transient lazy val carbonRelation: CarbonRelation =
     CarbonEnv.getInstance(sparkSession).carbonMetaStore.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala
similarity index 89%
rename from integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala
rename to integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala
index 9826666..25f9b21 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala
@@ -19,13 +19,11 @@ package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 
 import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 
-object CarbonUtils {
+object CarbonThreadUtil {
 
   private[sql] val threadStatementId = new ThreadLocal[Long]
 
@@ -81,10 +79,4 @@ object CarbonUtils {
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
   }
 
-  def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDatasourceHadoopRelation] = {
-    plan collect {
-      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    }
-  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 3bc2590..d22a1e0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -861,7 +861,7 @@ object CommonLoadUtils {
   def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
     val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get
-    CarbonUtils.threadSet("partition.operationcontext", loadParams.operationContext)
+    CarbonThreadUtil.threadSet("partition.operationcontext", loadParams.operationContext)
     val attributes = if (loadParams.scanResultRDD.isDefined) {
       // take the already re-arranged attributes
       catalogTable.schema.toAttributes
@@ -1059,7 +1059,7 @@ object CommonLoadUtils {
         LOGGER.error(ex)
         throw ex
     } finally {
-      CarbonUtils.threadUnset("partition.operationcontext")
+      CarbonThreadUtil.threadUnset("partition.operationcontext")
       if (loadParams.isOverwriteTable) {
         IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
         // Clean the overwriting segments if any.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 6aa8c88..b026e4d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, CarbonUtils, Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, CarbonThreadUtil, Column, DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.avro.AvroFileFormatFactory
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -49,7 +49,6 @@ import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.FailureCauses
@@ -79,7 +78,7 @@ case class CarbonMergeDataSetCommand(
    *
    */
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val relations = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan)
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
     // Target dataset must be backed by carbondata table.
     if (relations.length != 1) {
       throw new UnsupportedOperationException(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
index aa1c05f..a072b26 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.parser
 
-import org.apache.spark.sql.{CarbonEnv, CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlParser
@@ -42,7 +42,7 @@ class CarbonExtensionSqlParser(
     parser.synchronized {
       CarbonEnv.getInstance(sparkSession)
     }
-    CarbonUtils.updateSessionInfoToCurrentThread(sparkSession)
+    CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession)
     try {
       val plan = parser.parse(sqlText)
       plan
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index f1d8f26..82d24f9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 
 import org.antlr.v4.runtime.tree.TerminalNode
-import org.apache.spark.sql.{CarbonSession, CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonSession, CarbonThreadUtil, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -45,7 +45,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Sp
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
-    CarbonUtils.updateSessionInfoToCurrentThread(sparkSession)
+    CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession)
     try {
       val parsedPlan = super.parsePlan(sqlText)
       CarbonScalaUtil.cleanParserThreadLocals
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
index dba8ff2..8a63351 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.secondaryindex.optimizer
 
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonThreadUtil, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -29,6 +29,7 @@ import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 /**
  * Rule for rewriting plan if query has a filter on index table column
@@ -61,7 +62,7 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
 
   private def checkIfRuleNeedToBeApplied(plan: LogicalPlan): Boolean = {
     var isRuleNeedToBeApplied = false
-    val relations = CarbonUtils.collectCarbonRelation(plan)
+    val relations = CarbonSparkUtil.collectCarbonRelation(plan)
     val isCreateAsSelect = isCreateTableAsSelect(plan)
     if (relations.nonEmpty && !isCreateAsSelect) {
       plan.collect {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 582ecfe..1d9b06b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.rdd.{CarbonMergeFilesRDD, RDD}
-import org.apache.spark.sql.{functions, CarbonEnv, CarbonUtils, DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.{functions, CarbonEnv, CarbonThreadUtil, DataFrame, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -582,7 +582,7 @@ object SecondaryIndexCreator {
       projections: String,
       segments: Array[String]): DataFrame = {
     try {
-      CarbonUtils.threadSet(
+      CarbonThreadUtil.threadSet(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName +
         CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(","))
       val logicalPlan = sparkSession.sql(
@@ -609,7 +609,7 @@ object SecondaryIndexCreator {
       tableProperties.put("isPositionIDRequested", "true")
       SparkSQLUtil.execute(newLogicalPlan, sparkSession)
     } finally {
-      CarbonUtils
+      CarbonThreadUtil
         .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
                      carbonTable.getDatabaseName + CarbonCommonConstants.POINT +
                      carbonTable.getTableName)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
index 1aeea03..07f48cd 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
@@ -23,7 +23,7 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.Duration
 
-import org.apache.spark.sql.{CarbonUtils, Row}
+import org.apache.spark.sql.{CarbonThreadUtil, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -59,31 +59,31 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA
   }
 
   test("test multithreading for segment reading") {
-    CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3")
+    CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3")
     val df = sql("select count(empno) from carbon_table_MulTI_THread")
     checkAnswer(df, Seq(Row(30)))
 
     val four = Future {
-      CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3")
+      CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3")
       val df = sql("select count(empno) from carbon_table_MulTI_THread")
       checkAnswer(df, Seq(Row(20)))
     }
 
     val three = Future {
-      CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2")
+      CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2")
       val df = sql("select count(empno) from carbon_table_MulTI_THread")
       checkAnswer(df, Seq(Row(30)))
     }
 
 
     val one = Future {
-      CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2")
+      CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2")
       val df = sql("select count(empno) from carbon_table_MulTI_THread")
       checkAnswer(df, Seq(Row(20)))
     }
 
     val two = Future {
-      CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1")
+      CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1")
       val df = sql("select count(empno) from carbon_table_MulTI_THread")
       checkAnswer(df, Seq(Row(10)))
     }
@@ -94,7 +94,7 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA
 
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
-    CarbonUtils.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread")
+    CarbonThreadUtil.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread")
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
   }
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala
index 3cee00e..078bffb 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.carbondata.query
 
-import org.apache.spark.sql.{CarbonEnv, CarbonUtils}
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, Or}
 import org.apache.spark.sql.test.util.QueryTest
@@ -59,7 +59,7 @@ class TestFilterReordering extends QueryTest with BeforeAndAfterAll{
 
   test("test disabling filter reordering") {
     sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=false")
-    CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+    CarbonThreadUtil.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
     val filter1 = Or(And(EqualTo("four", 11), EqualTo("two", 11)), EqualTo("one", 11))
     val table = CarbonEnv.getCarbonTable(None, "filter_reorder")(sqlContext.sparkSession)
     val d: Array[Filter] = CarbonFilters.reorderFilter(Array(filter1), table)
@@ -69,7 +69,7 @@ class TestFilterReordering extends QueryTest with BeforeAndAfterAll{
 
   override protected def afterAll(): Unit = {
     sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=true")
-    CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+    CarbonThreadUtil.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
     sql("drop table if exists filter_reorder")
   }
 }