You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/06/07 13:48:44 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

akashrn5 commented on a change in pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#discussion_r646379260



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
##########
@@ -72,7 +72,7 @@ abstract class CarbonRDD[T: ClassTag](
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
+    SparkVersionAdapter.addTaskCompletionListener(ThreadLocalSessionInfo.unsetAll())

Review comment:
       please ad the scala bug in the comment of scala method `SparkVersionAdapter`

##########
File path: integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu
   override def qualifier: Seq[String] = null
 
   override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
\ No newline at end of file
+}

Review comment:
       revert this

##########
File path: integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.mutable
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.codegen.{ExprCode, GeneratePredicate}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoTable, Join, LogicalPlan, OneRowRelation, Statistics}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, RefreshTable}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{DataType, StructField}
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object SparkVersionAdapter {

Review comment:
       please select the code from 63 to last and correct the style, (Ctrl + Shift + Alt + L)

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import scala.collection.mutable
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SparkVersionAdapter}
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * Concrete parser for Spark SQL statements and carbon specific
+ * statements
+ */
+class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends SparkSqlParser {
+
+  val parser = new CarbonSpark2SqlParser
+
+  override val astBuilder = CarbonReflectionUtils.getAstBuilder(conf, parser, sparkSession)
+
+  private val substitutor = new VariableSubstitution
+
+  override def parsePlan(sqlText: String): LogicalPlan = {
+    CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession)
+    try {
+      val parsedPlan = super.parsePlan(sqlText)
+      CarbonScalaUtil.cleanParserThreadLocals
+      parsedPlan
+    } catch {
+      case ce: MalformedCarbonCommandException =>
+        CarbonScalaUtil.cleanParserThreadLocals
+        throw ce
+      case ex: Throwable =>
+        try {
+          parser.parse(sqlText)
+        } catch {
+          case mce: MalformedCarbonCommandException =>
+            throw mce
+          case e: Throwable =>
+            CarbonException.analysisException(
+              s"""== Parse1 ==
+                 |${ex.getMessage}
+                 |== Parse2 ==
+                 |${e.getMessage}
+               """.stripMargin.trim)
+        }
+    }
+  }
+
+  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+    super.parse(substitutor.substitute(command))(toResult)
+  }
+}
+
+class CarbonHelperSqlAstBuilder(conf: SQLConf,
+    parser: CarbonSpark2SqlParser,
+    sparkSession: SparkSession)
+  extends SparkSqlAstBuilderWrapper(conf) {
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+   */
+  override def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+    val props = visitTablePropertyList(ctx)
+    CarbonSparkSqlParserUtil.visitPropertyKeyValues(ctx, props)
+  }
+
+  def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
+  = {
+    Option(ctx).map(visitPropertyKeyValues)
+      .getOrElse(Map.empty)
+  }
+
+  def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
+    BucketSpecContext, PartitionFieldListContext, ColTypeListContext, TablePropertyListContext,
+    LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = {
+    // val parser = new CarbonSpark2SqlParser

Review comment:
       remove this line

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object SparkVersionAdapter {
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+                           mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, ExplainMode.fromString(mode.getOrElse(SimpleMode.name)))
+  }
+
+  def getExplainCommandObj(mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(OneRowRelation(), ExplainMode.fromString(mode.getOrElse(SimpleMode.name)))
+  }
+
+  /**
+   * As a part of SPARK-24085 Hive tables supports scala subquery for
+   * the partitioned tables,so Carbon also needs to supports
+   * @param partitionSet
+   * @param filterPredicates
+   * @return
+   */
+  def getPartitionFilter(
+                          partitionSet: AttributeSet,
+                          filterPredicates: Seq[Expression]): Seq[Expression] = {

Review comment:
       correct the style here and check whole file once, select whole file and correct the style once

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
##########
@@ -1551,26 +1552,28 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
             sql("insert into hive_table select 'a','b','1'");
         }
 
-        val e2 = intercept[SparkException] {
-            sql("insert into hive_table2 select 'a','b','binary'");
-        }
+        if (!SparkUtil.isSparkVersionXAndAbove("3")) {
+            val e2 = intercept[SparkException] {
+                sql("insert into hive_table2 select 'a','b','binary'");
+            }
 
-        assert(e2.getMessage.contains(
-            "Dynamic partition strict mode requires at least one static partition column"))
+            assert(e2.getMessage.contains(
+                "Dynamic partition strict mode requires at least one static partition column"))
 
-        val eInt2 = intercept[Exception] {
-            sql("insert into hive_table2 select 'a','b','1'");
-        }
+            val eInt2 = intercept[Exception] {
+                sql("insert into hive_table2 select 'a','b','1'");
+            }
 
-        val e3 = intercept[SparkException] {
-            sql("insert into parquet_table select 'a','b','binary'");
-        }
+            val e3 = intercept[SparkException] {
+                sql("insert into parquet_table select 'a','b','binary'");
+            }
 
-        assert(e3.getMessage.contains(
-            "Dynamic partition strict mode requires at least one static partition column"))
+            assert(e3.getMessage.contains(
+                "Dynamic partition strict mode requires at least one static partition column"))
 
-        val eInt3 = intercept[Exception] {
-            sql("insert into parquet_table select 'a','b','1'");
+            val eInt3 = intercept[Exception] {
+                sql("insert into parquet_table select 'a','b','1'");
+            }

Review comment:
       revert

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
##########
@@ -169,7 +128,7 @@ case class CarbonDataSourceScan(
   override protected def doCanonicalize(): CarbonDataSourceScan = {
     CarbonDataSourceScan(
       relation,
-      output.map(QueryPlan.normalizeExprId(_, output)),
+      a,

Review comment:
       please rename to the proper variable name like `outputAttibutesAfterNormalizingExpressionIds`

##########
File path: mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode}
+import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineUnions, ConstantFolding, EliminateLimits, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicates, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+
+object SparkVersionHelper {
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+      plan: LogicalPlan, stats: Statistics,
+      aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes: AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats)
+  }
+
+  def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = {
+    val Subquery(newPlan, _) = BirdcageOptimizer.execute(Subquery.fromExpression(s))
+    newPlan
+  }
+
+  def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = {
+    QueryPlan.normalizeExpressions(r, attrs)
+  }
+
+  def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Alias] = {
+    rAliasMap.asInstanceOf[AttributeMap[Alias]]
+  }
+
+  def seqOfRules : Seq[Rule[LogicalPlan]] = {
+    Seq(
+      // Operator push down
+      PushProjectionThroughUnion,
+      ReorderJoin,
+      EliminateOuterJoin,
+      PushPredicateThroughJoin,
+      PushDownPredicates,
+      ColumnPruning,
+      // Operator combine
+      CollapseRepartition,
+      CollapseProject,
+      CollapseWindow,
+      CombineFilters,
+      EliminateLimits,
+      CombineUnions,
+      // Constant folding and strength reduction
+      NullPropagation,
+      FoldablePropagation,
+      ConstantFolding,
+      ReorderAssociativeOperator,
+      // No need to apply LikeSimplification rule while creating MV
+      // as modular plan asCompactSql will be set in schema
+      //        LikeSimplification,
+      BooleanSimplification,
+      SimplifyConditionals,
+      RemoveDispensableExpressions,
+      SimplifyBinaryComparison,
+      EliminateSorts,
+      SimplifyCasts,
+      SimplifyCaseConversionExpressions,
+      RewriteCorrelatedScalarSubquery,
+      EliminateSerialization,
+      RemoveRedundantAliases)
+  }
+
+}
+
+trait getVerboseString extends LeafNode {
+  def verboseString: String = toString
+}
+
+trait groupByUnaryNode extends UnaryNode {
+  override def verboseString(maxFields: Int): String = super.verboseString(maxFields)
+
+  override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = {
+    super.mapProductIterator(f)
+  }
+
+  override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = {
+    val groupBy = super.mapChildren(f)
+    if (this.rewritten && !groupBy.rewritten) {
+      groupBy.setRewritten()
+    }
+    groupBy
+  }
+}
+
+trait selectModularPlan extends ModularPlan {
+  override def verboseString(maxFields: Int): String = super.verboseString(maxFields)
+
+  override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = {
+    super.mapProductIterator(f)
+  }
+
+  override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = {
+    val select = super.mapChildren(f)
+    if (this.rewritten && !select.rewritten) {
+      select.setRewritten()
+    }
+    select
+  }
+}
+
+trait unionModularPlan extends ModularPlan {

Review comment:
       please change all trait names, according to above @kunal642 comment

##########
File path: integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => SparkExpression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec}
+import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.index.IndexFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+abstract class  CarbonDataSourceScanHelper(relation: CarbonDatasourceHadoopRelation,
+                                           output: Seq[Attribute],
+    partitionFilters: Seq[SparkExpression],
+    pushedDownFilters: Seq[Expression],
+    pushedDownProjection: CarbonProjection,
+    directScanSupport: Boolean,
+    extraRDD: Option[(RDD[InternalRow], Boolean)],
+    segmentIds: Option[String])
+  extends DataSourceScanExec with ColumnarBatchScan {

Review comment:
       please correct the style of class definition, please also check all the places, if I miss somewhere

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil

Review comment:
       there are so many unused imports, please remove and check in other places and also check why the CI didn't fail for this

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
##########
@@ -21,21 +21,20 @@ import java.util.concurrent.Callable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.{CarbonSourceStrategy, DDLStrategy, DMLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 

Review comment:
       revert changes if not required

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -18,8 +18,8 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.types.DataType
 

Review comment:
       revert this change

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
##########
@@ -544,12 +545,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   protected lazy val explainPlan: Parser[LogicalPlan] =
-    (EXPLAIN ~> opt(EXTENDED)) ~ start ^^ {
-      case isExtended ~ logicalPlan =>
+    (EXPLAIN ~> opt(MODE)) ~ start ^^ {
+      case mode ~ logicalPlan =>

Review comment:
       can you please add test cases with different modes?

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}

Review comment:
       many unused imports, please remove

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
##########
@@ -298,7 +301,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     for (i <- 0 until 10) {
       sql(s"alter table addsegment1 add segment " +
           s"options('path'='${ newPath + i }', 'format'='carbon')").collect()
-
+      sql("select count(*) from addsegment1").show()

Review comment:
       revert this

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
##########
@@ -1551,26 +1552,28 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
             sql("insert into hive_table select 'a','b','1'");
         }
 
-        val e2 = intercept[SparkException] {
-            sql("insert into hive_table2 select 'a','b','binary'");
-        }
+        if (!SparkUtil.isSparkVersionXAndAbove("3")) {
+            val e2 = intercept[SparkException] {
+                sql("insert into hive_table2 select 'a','b','binary'");
+            }
 
-        assert(e2.getMessage.contains(
-            "Dynamic partition strict mode requires at least one static partition column"))
+            assert(e2.getMessage.contains(
+                "Dynamic partition strict mode requires at least one static partition column"))
 
-        val eInt2 = intercept[Exception] {
-            sql("insert into hive_table2 select 'a','b','1'");
-        }
+            val eInt2 = intercept[Exception] {
+                sql("insert into hive_table2 select 'a','b','1'");
+            }
 
-        val e3 = intercept[SparkException] {
-            sql("insert into parquet_table select 'a','b','binary'");
-        }
+            val e3 = intercept[SparkException] {
+                sql("insert into parquet_table select 'a','b','binary'");
+            }
 
-        assert(e3.getMessage.contains(
-            "Dynamic partition strict mode requires at least one static partition column"))
+            assert(e3.getMessage.contains(
+                "Dynamic partition strict mode requires at least one static partition column"))

Review comment:
       revert all the above space changes

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -100,6 +100,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
   test("clean up table and test trash folder with Marked For Delete and Compacted segments") {
     // do not send MFD folders to trash
     createTable()
+    sql(s"""Show Tables """).show()

Review comment:
       revert this

##########
File path: mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineLimits, CombineUnions, ConstantFolding, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicate, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, RemoveRedundantProject, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+
+
+object SparkVersionHelper {
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+                       plan: LogicalPlan, stats: Statistics,
+                       aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)

Review comment:
       please correct the style and do for all code of class

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -842,7 +842,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       Row("d", "3")
     ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
 
-    initframe.write
+    initframe.repartition(1).write

Review comment:
       why this change

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object SparkVersionAdapter {
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+                           mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, ExplainMode.fromString(mode.getOrElse(SimpleMode.name)))
+  }
+
+  def getExplainCommandObj(mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(OneRowRelation(), ExplainMode.fromString(mode.getOrElse(SimpleMode.name)))
+  }
+
+  /**
+   * As a part of SPARK-24085 Hive tables supports scala subquery for
+   * the partitioned tables,so Carbon also needs to supports
+   * @param partitionSet
+   * @param filterPredicates
+   * @return
+   */
+  def getPartitionFilter(
+                          partitionSet: AttributeSet,
+                          filterPredicates: Seq[Expression]): Seq[Expression] = {
+    filterPredicates
+      .filterNot(SubqueryExpression.hasSubquery)
+      .filter { filter =>
+        filter.references.nonEmpty && filter.references.subsetOf(partitionSet)
+      }
+  }
+
+  def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): Seq[Expression] = {
+    filter
+  }
+
+  // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+  def getOptimizeCodegenRule(): Seq[Rule[LogicalPlan]] = {
+    Seq.empty
+  }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+                              map: Map[String, String],
+                              tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
+
+  def getOutput(subQueryAlias: SubqueryAlias): Seq[Attribute] = {
+    val newAlias = Seq(subQueryAlias.identifier.name)
+    subQueryAlias.child.output.map(_.withQualifier(newAlias))
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), ZoneId.systemDefault())
+  }
+
+  def stringToTime(value: String): java.util.Date = {
+    stringToDateValue(value)
+  }
+
+  def getPredicate(inputSchema: Seq[Attribute],
+                   condition: Option[Expression]): InternalRow => Boolean = {
+    Predicate.create(condition.get, inputSchema).eval _
+  }
+
+  @tailrec
+  private def stringToDateValue(value: String): java.util.Date = {
+    val indexOfGMT = value.indexOf("GMT")
+    if (indexOfGMT != -1) {
+      // ISO8601 with a weird time zone specifier (2000-01-01T00:00GMT+01:00)
+      val s0 = value.substring(0, indexOfGMT)
+      val s1 = value.substring(indexOfGMT + 3)
+      // Mapped to 2000-01-01T00:00+01:00
+      stringToDateValue(s0 + s1)
+    } else if (!value.contains('T')) {
+      // JDBC escape string
+      if (value.contains(' ')) {
+        Timestamp.valueOf(value)
+      } else {
+        Date.valueOf(value)
+      }
+    } else {
+      DatatypeConverter.parseDateTime(value).getTime
+    }
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    TimestampFormatter.getFractionFormatter(ZoneId.systemDefault()).format(timeStamp)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.daysToLocalDate(date).toString
+  }
+
+  def addTaskCompletionListener[U](f: => U) {
+    TaskContext.get().addTaskCompletionListener[Unit] { context =>
+      f
+    }
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    val tableName = u.tableName.split("\\.")
+    if (tableName.size == 2) {
+      Some(TableIdentifier(tableName(1), Option(tableName(0))))
+    } else {
+      val currentDatabase = SparkSQLUtil.getSparkSession.sessionState.catalog.getCurrentDatabase
+      Some(TableIdentifier(tableName(0), Option(currentDatabase)))
+    }
+  }
+
+  def createRangeListScalaUDF(toRangeListUDF: ToRangeListAsStringUDF,
+                              dataType: StringType.type,
+                              children: Seq[Expression],
+                              inputTypes: Seq[DataType]): ScalaUDF = {
+    ScalaUDF(toRangeListUDF,
+      dataType,
+      children,
+      Nil,
+      None,
+      Some("ToRangeListAsString"))
+  }
+
+  def getTransformedPolygonJoinUdf(scalaUdf: ScalaUDF,
+                                   udfChildren: Seq[Expression],
+                                   polygonJoinUdf: InPolygonJoinUDF): ScalaUDF = {
+    ScalaUDF(polygonJoinUdf,
+      scalaUdf.dataType,
+      udfChildren,
+      scalaUdf.inputEncoders,
+      scalaUdf.outputEncoder,
+      scalaUdf.udfName)
+  }
+
+  def getTableIdentifier(parts: Seq[String]): TableIdentifier = {
+    if (parts.length == 1) {
+      TableIdentifier(parts.head, None)
+    } else {
+      TableIdentifier(parts(1), Option(parts.head))
+    }
+  }
+
+  def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow],
+                           child: SparkPlan, serializer: Serializer): ShuffledRowRDD = {
+    val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+    new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer, writeMetrics), writeMetrics)
+  }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+                            plan: LogicalPlan): LogicalPlan = {
+    analyzer.executeAndCheck(plan, QueryPlanningTracker.get.getOrElse(new QueryPlanningTracker))
+  }
+
+  def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = {
+    QueryPlan.normalizeExpressions(r, attrs)
+  }
+
+  def getBuildRight: BuildSide = {
+    BuildRight
+  }
+
+  def getBuildLeft: BuildSide = {
+    BuildLeft
+  }
+
+  def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = {
+    SQLExecution.withNewExecutionId(queryExecution, None)(_)
+  }
+
+  def createJoinNode(child: LogicalPlan,
+                     targetTable: LogicalPlan,
+                     joinType: JoinType,
+                     condition: Option[Expression]): Join = {
+    Join(child, targetTable, joinType, condition, JoinHint.NONE)
+  }
+
+  def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = {
+    x.partitionSpec
+  }
+
+  type CarbonBuildSideType = BuildSide
+  type InsertIntoStatementWrapper = InsertIntoStatement
+
+  def createRefreshTableCommand(tableIdentifier: TableIdentifier): RefreshTableCommand = {
+    RefreshTableCommand(tableIdentifier)
+  }
+
+  type RefreshTables = RefreshTableCommand
+
+  /**
+   * Validates the partition columns and return's A tuple of partition columns and partitioner
+   * fields.
+   *
+   * @param partitionColumns        An instance of ColTypeListContext having parser rules for
+   *                                column.
+   * @param colNames                <Seq[String]> Sequence of Table column names.
+   * @param tableProperties         <Map[String, String]> Table property map.
+   * @param partitionByStructFields Seq[StructField] Sequence of partition fields.
+   * @return <Seq[PartitionerField]> A Seq of partitioner fields.
+   */
+  def validatePartitionFields(
+                               partitionColumns: PartitionFieldListContext,
+                               colNames: Seq[String],
+                               tableProperties: mutable.Map[String, String],
+                               partitionByStructFields: Seq[StructField]): Seq[PartitionerField] = {
+
+    val partitionerFields = partitionByStructFields.map { structField =>
+      PartitionerField(structField.name, Some(structField.dataType.toString), null)
+    }
+    // validate partition clause
+    if (partitionerFields.nonEmpty) {
+      // partition columns should not be part of the schema
+      val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet
+        .intersect(colNames.map(_.toLowerCase).toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+          badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
+          , partitionColumns: PartitionFieldListContext)
+      }
+    }
+    partitionerFields
+  }
+
+  /**
+   * The method validates the create table command and returns the create table or
+   * ctas table LogicalPlan.
+   *
+   * @param createTableTuple a tuple of (CreateTableHeaderContext, SkewSpecContext,
+   *                         BucketSpecContext, PartitionFieldListContext, ColTypeListContext,
+   *                         TablePropertyListContext,
+   *                         LocationSpecContext, Option[String], TerminalNode, QueryContext,
+   *                         String)
+   * @param extraTableTuple  A tuple of (Seq[StructField], Boolean, TableIdentifier, Boolean,
+   *                         Seq[String],
+   *                         Option[String], mutable.Map[String, String], Map[String, String],
+   *                         Seq[StructField],
+   *                         Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
+   *                         Option[LogicalPlan])
+   * @return <LogicalPlan> of create table or ctas table
+   *
+   */
+  def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
+    BucketSpecContext, PartitionFieldListContext, ColTypeListContext, TablePropertyListContext,
+    LocationSpecContext, Option[String], TerminalNode, QueryContext, String),
+    extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, Seq[String],
+    Option[String], mutable.Map[String, String], Map[String, String], Seq[StructField],
+    Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
+    Option[LogicalPlan])): LogicalPlan = {
+    val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, columns,
+    tablePropertyList, locationSpecContext, tableComment, ctas, query, provider) = createTableTuple
+    val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath,
+    tableProperties, properties, partitionByStructFields, partitionFields,
+    parser, sparkSession, selectQuery) = extraTableTuple
+    val options = new CarbonOption(properties)
+    // validate streaming property
+    validateStreamingProperty(options)
+    var fields = parser.getFields(cols ++ partitionByStructFields)
+    // validate for create table as select
+    selectQuery match {
+      case Some(q) =>
+        // create table as select does not allow creation of partitioned table
+        if (partitionFields.nonEmpty) {
+          val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
+            "create a partitioned table using Carbondata file formats."
+          operationNotAllowed(errorMessage, partitionColumns)
+        }
+        // create table as select does not allow to explicitly specify schema
+        if (fields.nonEmpty) {
+          operationNotAllowed(
+            "Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
+        }
+        // external table is not allow
+        if (external) {
+          operationNotAllowed("Create external table as select", tableHeader)
+        }
+        fields = parser
+          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
+            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
+      case _ =>
+      // ignore this case
+    }
+    val columnNames = fields.map(_.name.get)
+    checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames)
+    if (partitionFields.nonEmpty && options.isStreaming) {
+      operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
+    }
+
+    if (!external && fields.isEmpty) {
+      throw new MalformedCarbonCommandException("Creating table without column(s) is not supported")
+    }
+    if (external && fields.isEmpty && tableProperties.nonEmpty) {
+      // as fields are always zero for external table, cannot validate table properties.
+      operationNotAllowed(
+        "Table properties are not supported for external table", tablePropertyList)
+    }
+
+    // Global dictionary is deprecated since 2.0
+    if (tableProperties.contains(CarbonCommonConstants.DICTIONARY_INCLUDE) ||
+      tableProperties.contains(CarbonCommonConstants.DICTIONARY_EXCLUDE)) {
+      DeprecatedFeatureException.globalDictNotSupported()
+    }
+
+    val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+    var isTransactionalTable: Boolean = true
+
+    val tableInfo = if (external) {
+      if (fields.nonEmpty) {
+        // user provided schema for this external table, this is not allow currently
+        // see CARBONDATA-2866
+        operationNotAllowed(
+          "Schema must not be specified for external table", columns)
+      }
+      if (partitionByStructFields.nonEmpty) {
+        operationNotAllowed(
+          "Partition is not supported for external table", partitionColumns)
+      }
+      // read table info from schema file in the provided table path
+      // external table also must convert table name to lower case
+      val identifier = AbsoluteTableIdentifier.from(
+        tablePath.get,
+        CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
+        tableIdentifier.table.toLowerCase())
+      val table = try {
+        val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
+        if (!FileFactory.isFileExist(schemaPath)) {
+          if (provider.equalsIgnoreCase("'carbonfile'")) {
+            SchemaReader.inferSchema(identifier, true)
+          } else {
+            isTransactionalTable = false
+            SchemaReader.inferSchema(identifier, false)
+          }
+        } else {
+          SchemaReader.getTableInfo(identifier)
+        }
+      } catch {
+        case e: Throwable =>
+          operationNotAllowed(s"Invalid table path provided: ${ tablePath.get } ", tableHeader)
+      }
+
+      // set "_external" property, so that DROP TABLE will not delete the data
+      if (provider.equalsIgnoreCase("'carbonfile'")) {
+        table.getFactTable.getTableProperties.put("_filelevelformat", "true")
+        table.getFactTable.getTableProperties.put("_external", "false")
+      } else {
+        table.getFactTable.getTableProperties.put("_external", "true")
+        table.getFactTable.getTableProperties.put("_filelevelformat", "false")
+      }
+      var isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (null == isLocalDic_enabled) {
+        table.getFactTable.getTableProperties
+          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+            CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
+                CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
+      }
+      isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
+        isLocalDic_enabled.toBoolean) {
+        val allColumns = table.getFactTable.getListOfColumns
+        for (i <- 0 until allColumns.size()) {
+          val cols = allColumns.get(i)
+          if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
+            cols.setLocalDictColumn(true)
+          }
+        }
+        table.getFactTable.setListOfColumns(allColumns)
+      }
+      table
+    } else {
+      // prepare table model of the collected tokens
+      val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
+        ifNotExists,
+        convertDbNameToLowerCase(tableIdentifier.database),
+        tableIdentifier.table.toLowerCase,
+        fields,
+        partitionFields,
+        tableProperties,
+        bucketFields,
+        isAlterFlow = false,
+        tableComment)
+      TableNewProcessor(tableModel)
+    }
+    tableInfo.setTransactionalTable(isTransactionalTable)
+    selectQuery match {
+      case query@Some(q) =>
+        CarbonCreateTableAsSelectCommand(
+          tableInfo = tableInfo,
+          query = query.get,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath)
+      case _ =>
+        CarbonCreateTableCommand(
+          tableInfo = tableInfo,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath,
+          external)
+    }
+  }
+
+  def getField(parser: CarbonSpark2SqlParser,
+               schema: Seq[QualifiedColType], isExternal: Boolean = false): Seq[Field] = {
+    schema.map { col =>
+      parser.getFields(col.comment, col.name.head, col.dataType, isExternal)
+    }
+  }
+
+  def supportsBatchOrColumnar(scan: CarbonDataSourceScan): Boolean = {
+    scan.supportsColumnar
+  }
+
+  def createDataset(sparkSession: SparkSession, qe: QueryExecution) : Dataset[Row] = {
+    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
+  }
+
+  def createSharedState(sparkContext: SparkContext) : SharedState = {
+    new SharedState(sparkContext, Map.empty[String, String])
+  }
+
+  def translateFilter(dataFilters: Seq[Expression]) : Seq[Filter] = {
+    dataFilters.flatMap(DataSourceStrategy.translateFilter(_,
+      supportNestedPredicatePushdown = false))
+  }
+
+  def getCarbonOptimizer(session: SparkSession,
+                         sessionState: SessionState): CarbonOptimizer = {
+    new CarbonOptimizer(session, sessionState.optimizer)
+  }
+
+  def isCharType(dataType: DataType): Boolean = {
+    dataType.isInstanceOf[CharType]
+  }
+
+  def isVarCharType(dataType: DataType): Boolean = {
+    dataType.isInstanceOf[VarcharType]
+  }
+
+  def getTypeName(s: AbstractDataType): String = {
+    s.defaultConcreteType.typeName
+  }
+
+  def getInsertIntoCommand(table: LogicalPlan,
+                           partition: Map[String, Option[String]],
+                           query: LogicalPlan,
+                           overwrite: Boolean,
+                           ifPartitionNotExists: Boolean): InsertIntoStatement = {
+    InsertIntoStatement(
+      table,
+      partition,
+      Nil,
+      query,
+      overwrite,
+      ifPartitionNotExists)
+  }
+}
+
+case class CarbonBuildSide(buildSide: BuildSide) {
+  def isRight: Boolean = buildSide.isInstanceOf[BuildRight.type]
+  def isLeft: Boolean = buildSide.isInstanceOf[BuildLeft.type]
+}
+
+abstract class CarbonTakeOrderedAndProjectExecHelper(sortOrder: Seq[SortOrder],
+    limit: Int, skipMapOrder: Boolean, readFromHead: Boolean) extends UnaryExecNode {
+  override def simpleString(maxFields: Int): String = {
+    val orderByString = sortOrder.mkString("[", ",", "]")
+    val outputString = output.mkString("[", ",", "]")
+
+    s"CarbonTakeOrderedAndProjectExec(limit=$limit, orderBy=$orderByString, " +
+      s"skipMapOrder=$skipMapOrder, readFromHead=$readFromHead, output=$outputString)"
+  }
+}

Review comment:
       add a new line after class, all these should be caught in CI, please check once

##########
File path: mv/plan/src/main/spark2.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
##########
@@ -46,5 +46,4 @@ object ExpressionHelper {
   def getTheLastQualifier(reference: AttributeReference): String = {
     reference.qualifier.head
   }
-
 }

Review comment:
       Please revert this class changes, not needed

##########
File path: scalastyle-config.xml
##########
@@ -203,19 +203,6 @@ This file is divided into 3 sections:
     ]]></customMessage>
  </check>
 
- <check customId="awaitresult" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
-  <parameters>
-   <parameter name="regex">Await\.result</parameter>
-  </parameters>
-  <customMessage><![CDATA[
-      Are you sure that you want to use Await.result? In most cases, you should use ThreadUtils.awaitResult instead.
-      If you must use Await.result, wrap the code block with
-      // scalastyle:off awaitresult
-      Await.result(...)
-      // scalastyle:on awaitresult
-    ]]></customMessage>
- </check>
-

Review comment:
       why this removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org