You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/05/26 12:33:02 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

akashrn5 commented on a change in pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#discussion_r639004523



##########
File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
##########
@@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue,
     }
   }
 
+  private static long createTimeInstant(String dimensionValue, String dateFormat) {
+    String updatedDim = dimensionValue;
+    if (!dimensionValue.trim().contains(" ")) {
+      updatedDim += " 00:00:00";
+    }
+    Instant instant = Instant.from(ZonedDateTime
+        .of(LocalDateTime.parse(updatedDim, DateTimeFormatter.ofPattern(dateFormat)),
+        ZoneId.systemDefault()));
+    validateTimeStampRange(instant.getEpochSecond());
+    long us = Math.multiplyExact(instant.getEpochSecond(), 1000L);
+    return Math.addExact(us, instant.getNano() * 1000L);
+  }
+
   private static Object parseTimestamp(String dimensionValue, String dateFormat) {
     Date dateToStr;
     DateFormat dateFormatter = null;
     long timeValue;
+    if (Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_SPARK_VERSION_SPARK3))) {
+      try {
+        return createTimeInstant(dimensionValue, dateFormat);

Review comment:
       please add a comment here why the change in spark3 for any future reference

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2644,5 +2644,4 @@ private CarbonCommonConstants() {
   public static final String CARBON_MAP_ORDER_PUSHDOWN = "carbon.mapOrderPushDown";
 
   public static final String CARBON_SDK_EMPTY_METADATA_PATH = "emptyMetadataFolder";
-

Review comment:
       revert this change

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -61,37 +61,37 @@ object LuceneIndexExample {
       """.stripMargin)
 
     // 1. Compare the performance:
-
-    def time(code: => Unit): Double = {
-      val start = System.currentTimeMillis()
-      code
-      // return time in second
-      (System.currentTimeMillis() - start).toDouble / 1000
-    }
-
-    val timeWithoutLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where id like '% test1 %'
-      """.stripMargin).show()
-
-    }
-
-    val timeWithLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where TEXT_MATCH('id:test1')
-      """.stripMargin).show()
-
-    }
+    // TODO: Revert this after SPARK 3.1 fix

Review comment:
       can you please mention the spark JIRA id here, so that reviewer can know the exact problem

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -30,7 +30,7 @@ object LuceneIndexExample {
 
   def main(args: Array[String]) {
     val spark = ExampleUtils.createSparkSession("LuceneIndexExample")
-    exampleBody(spark)
+      exampleBody(spark)

Review comment:
       revert this 

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
##########
@@ -151,7 +151,7 @@ object StructuredStreamingExample {
           // Write data from socket stream to carbondata file
           qry = readSocketDF.writeStream
             .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
+            .trigger(CarbonToSparkAdapter.getProcessingTime("5 seconds"))

Review comment:
       `getProcessingTime` calls the same API both in 2.4 and 3.1 and package also same for `Trigger` class, then why we need to put in different package, can you please explain?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
##########
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}

Review comment:
       can we keep this class in the same package structure as old?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -360,9 +359,14 @@ object CarbonStore {
 
   private def validateTimeFormat(timestamp: String): Long = {
     try {
-      DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get
+      CarbonToSparkAdapter.stringToTimestamp(timestamp) match {
+        case Some(value) => value
+        case _ =>
+          val errorMessage = "Error: Invalid load start time format: " + timestamp

Review comment:
       no need to define the same error message here, as its already handled in catch block, can just throw runtime exception

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
##########
@@ -67,6 +68,8 @@ object DDLStrategy extends SparkStrategy {
       case changeColumn: AlterTableChangeColumnCommand
         if isCarbonTable(changeColumn.tableName) =>
         ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil
+      case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand =>

Review comment:
       command will always come as change column command. In which case this required as it wasn't there before

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
##########
@@ -160,9 +160,7 @@ class IndexLoaderRDD(
     val reader = indexFormat.createRecordReader(inputSplit, attemptContext)
     val iter = new Iterator[(TableBlockIndexUniqueIdentifier, BlockletIndexDetailsWithSchema)] {
       // in case of success, failure or cancellation clear memory and stop execution
-      context.addTaskCompletionListener { _ =>
-        reader.close()
-      }
+      CarbonToSparkAdapter.addTaskCompletionListener(reader.close())

Review comment:
       here also i can see same code in 3.1 and 2.4, so why this required?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
##########
@@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue,
     }
   }
 
+  private static long createTimeInstant(String dimensionValue, String dateFormat) {
+    String updatedDim = dimensionValue;
+    if (!dimensionValue.trim().contains(" ")) {
+      updatedDim += " 00:00:00";

Review comment:
       please add a comment here why we are adding the string and then add a TODO to analyze it and change it later once the integration is complete in alpha stage

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -185,8 +224,390 @@ object CarbonToSparkAdapter {
   def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = {
     sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }
+
+  def stringToTime(value: String): java.util.Date = {
+    DateTimeUtils.stringToTime(value)
+  }
+
+  def getProcessingTime: String => Trigger = {
+    Trigger.ProcessingTime
+  }
+
+  def addTaskCompletionListener[U](f: => U) {
+    TaskContext.get().addTaskCompletionListener { context =>
+      f
+    }
+  }
+
+  def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow],
+      child: SparkPlan, serializer: Serializer): ShuffledRowRDD = {
+    new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+  }
+
+  def getInsertIntoCommand(table: LogicalPlan,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean): InsertIntoTable = {
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
+  }
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+      mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, mode.isDefined)
+  }
+
+  def getExplainCommandObj(mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(OneRowRelation(), mode.isDefined)
+  }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+      plan: LogicalPlan): LogicalPlan = {
+    analyzer.executeAndCheck(plan)
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def getBuildRight: BuildSide = {
+    BuildRight
+  }
+
+  def getBuildLeft: BuildSide = {
+    BuildLeft
+  }
+
+  type CarbonBuildSideType = BuildSide
+  type InsertIntoStatementWrapper = InsertIntoTable
+
+  def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = {
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_)
+  }
+
+  def getTableIdentifier(parts: TableIdentifier): TableIdentifier = {
+    parts
+  }
+
+  def createJoinNode(child: LogicalPlan,
+      targetTable: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    Join(child, targetTable, joinType, condition)
+  }
+
+  def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = {
+    x.partition
+  }
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],

Review comment:
       i think this method no one using, can remove if no one uses

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
##########
@@ -71,6 +71,12 @@ object SparkTestQueryExecutor {
     .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
     .config("spark.sql.extensions", extensions)
+    .config("spark.sql.storeAssignmentPolicy", "legacy")
+    .config("spark.sql.legacy.timeParserPolicy", "legacy")

Review comment:
       add TODO to writs test cases for non legacy

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -229,8 +270,388 @@ object CarbonToSparkAdapter {
       .unwrapped
       .asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]): FilePartition = {
+    FilePartition(index, files.toArray)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def stringToTime(value: String): java.util.Date = {
+    DateTimeUtils.stringToTime(value)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def getProcessingTime: String => Trigger = {
+    Trigger.ProcessingTime
+  }
+
+  def addTaskCompletionListener[U](f: => U) {
+    TaskContext.get().addTaskCompletionListener { context =>
+      f
+    }
+  }
+
+  def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow],
+      child: SparkPlan, serializer: Serializer): ShuffledRowRDD = {
+    new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+  }
+
+  def getInsertIntoCommand(table: LogicalPlan,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean): InsertIntoTable = {
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
+  }
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+      mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, mode.isDefined)
+  }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+      plan: LogicalPlan): LogicalPlan = {
+    analyzer.executeAndCheck(plan)
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def getBuildRight: BuildSide = {
+    BuildRight
+  }
+
+  def getBuildLeft: BuildSide = {
+    BuildLeft
+  }
+
+  type CarbonBuildSideType = BuildSide
+  type InsertIntoStatementWrapper = InsertIntoTable
+
+  def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = {
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_)
+  }
+
+  def createJoinNode(child: LogicalPlan,
+      targetTable: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    Join(child, targetTable, joinType, condition)
+  }
+
+  def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = {
+    x.partition
+  }
+
+  def getTableIdentifier(parts: TableIdentifier): TableIdentifier = {
+    parts
+  }
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+      plan: LogicalPlan, stats: Statistics,
+      aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes: AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  def createRefreshTableCommand(tableIdentifier: TableIdentifier): RefreshTable = {
+    RefreshTable(tableIdentifier)
+  }
+
+  type RefreshTables = RefreshTable

Review comment:
       as observed so many common code present for 2.3 and 3.1, so please do the refactor as given in previous comment

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu
   override def qualifier: Seq[String] = null
 
   override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
\ No newline at end of file
+}

Review comment:
       revert this

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.AtomicType
+
+import org.apache.carbondata.core.index.IndexFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS carbondata.
+ */
+case class CarbonDataSourceScan(

Review comment:
       this class has lot of duplicate code of 2.3, please check and refactor

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonAnalyzer.scala
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.CarbonReflectionUtils
+
+class CarbonAnalyzer(catalog: SessionCatalog,

Review comment:
       this is same for 2.3 and 2.4, please refactor to common package

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession}

Review comment:
       same as 2.3, please move to common

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
##########
@@ -93,6 +93,7 @@ object ExampleUtils {
       .config("spark.driver.host", "localhost")
       .config("spark.sql.crossJoin.enabled", "true")
       .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
+      .config("spark.sql.legacy.timeParserPolicy", "LEGACY")

Review comment:
       please add a comment here to analyze and take care and add tests for other than LEGACY

##########
File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
##########
@@ -111,14 +111,13 @@ class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll {
         s"(SELECT * FROM $tableName" + "_segments)").collect()
       assert(rows.length > 0)
       for (index <- 0 until unloadedStageCount) {
-        val row = rows(index)
         assert(rows(index).getString(0) == null)
         assert(rows(index).getString(1).equals("Unload"))
         assert(rows(index).getString(2) != null)
-        assert(rows(index).getLong(3) == -1)
+        assert(rows(index).getString(3) == "-1")

Review comment:
       why we need to change here? can you please explain the reason behind

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
##########
@@ -183,7 +183,7 @@ object DataLoadProcessorStepOnSpark {
     val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
     rowConverter.initialize()
 
-    TaskContext.get().addTaskCompletionListener { context =>
+    CarbonToSparkAdapter.addTaskCompletionListener {

Review comment:
       here also i don't see any diff between 2.4 and 3.1, why refactored, can you please explain

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
##########
@@ -321,7 +321,7 @@ object DeleteExecution {
           deleteStatus = SegmentStatus.SUCCESS
         } catch {
           case e : MultipleMatchingException =>
-            LOGGER.error(e.getMessage)
+          LOGGER.error(e.getMessage)

Review comment:
       revert this

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan, logical}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, SQLExecution, ShuffledRowRDD, SimpleMode, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object CarbonToSparkAdapter {
+
+  def addSparkSessionListener(sparkSession: SparkSession): Unit = {
+    sparkSession.sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        CarbonEnv.carbonEnvMap.remove(sparkSession)
+        ThreadLocalSessionInfo.unsetAll()
+      }
+    })
+  }
+
+  def addSparkListener(sparkContext: SparkContext): Unit = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
+    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qf)
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Seq[String]): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def lowerCaseAttribute(expression: Expression): Expression = expression.transform {
+    case attr: AttributeReference =>
+      CarbonToSparkAdapter.createAttributeReference(
+        attr.name.toLowerCase,
+        attr.dataType,
+        attr.nullable,
+        attr.metadata,
+        attr.exprId,
+        attr.qualifier)
+  }
+
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = newSubsume.split("\n").map(_.trim))
+  }
+
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
+    s.copy(children = Seq(reference))
+  }
+
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType): ExprCode = {
+    ExprCode(
+      code"$code",
+      JavaCode.isNullVariable(isNull),
+      JavaCode.variable(value, dataType))
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Seq[String] = Seq.empty,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr: Option[NamedExpression] = None) : Alias = {

Review comment:
       looks like `namedExpr` is not used, please check in other files and remove

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
##########
@@ -46,12 +49,25 @@ case class CarbonShowSegmentsAsSelectCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
   }
-  private lazy val df = createDataFrame
+
+  val tempViewName: String = makeTempViewName(carbonTable)
+
+  val df: DataFrame = createDataFrame
 
   override def output: Seq[Attribute] = {
-    df.queryExecution.analyzed.output.map { attr =>
-      AttributeReference(attr.name, attr.dataType, nullable = true)()
+    var attrList: Seq[Attribute] = Seq()

Review comment:
       this Seq should be declared at class level if output called multiple times? @kunal642 @vikramahuja1001 

##########
File path: index/examples/pom.xml
##########
@@ -81,9 +81,6 @@
   <profiles>
     <profile>
       <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>

Review comment:
       do we need to add new profile here and make that active by default? @kunal642 @vikramahuja1001 

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkSqlAdapter.scala
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object SparkSqlAdapter {
+
+  def initSparkSQL(): Unit = {
+  }
+
+  def getScanForSegments(
+      @transient relation: HadoopFsRelation,

Review comment:
       i dont see any difference between all profiles, please check and remove

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)

Review comment:
       i can see this class same to all profiles, what is the change here?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
##########
@@ -181,7 +182,7 @@ object CarbonSparkUtil {
    */
   def createHadoopJob(conf: Configuration = FileFactory.getConfiguration): Job = {
     val jobConf = new JobConf(conf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
+    SparkUtil.addCredentials(jobConf)

Review comment:
       no need to add method for just one line, and only its called from here,so can revert this change I feel

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review comment:
       same as 2.3, please move

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
##########
@@ -82,10 +83,10 @@ case class MergeProjection(
       if (output.contains(null)) {
         throw new CarbonMergeDataSetException(s"Not all columns are mapped")
       }
-      (new InterpretedMutableProjection(output ++ Seq(
+      (output ++ Seq(
         ds.queryExecution.analyzed.resolveQuoted(statusCol,
           sparkSession.sessionState.analyzer.resolver).get),
-        ds.queryExecution.analyzed.output), expectOutput)

Review comment:
       why  `expectOutput` is removed here?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
##########
@@ -78,10 +78,8 @@ case class CarbonCreateDataSourceTableCommand(
       catalogTable.partitionColumnNames,
       caseSensitiveAnalysis)
     val rows = try {
-      CreateDataSourceTableCommand(
-        catalogTable,
-        ignoreIfExists
-      ).run(sparkSession)
+      org.apache.spark.sql.execution.CreateDataSourceTableCommand
+        .createDataSource(catalogTable, ignoreIfExists, sparkSession)

Review comment:
       i can see same code of `CreateDataSourceTableCommand` in both 3.1 and 2.4, so better to make a common package and make it clean instead of duplicating, we had followed the same when we did previous upgrades also

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan, logical}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, SQLExecution, ShuffledRowRDD, SimpleMode, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object CarbonToSparkAdapter {

Review comment:
       please pull out the common code between 2.4 and 3.1 and move to common package and avoid code duplication 

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -60,40 +60,6 @@ object LuceneIndexExample {
          | AS 'lucene'
       """.stripMargin)
 
-    // 1. Compare the performance:
-
-    def time(code: => Unit): Double = {
-      val start = System.currentTimeMillis()
-      code
-      // return time in second
-      (System.currentTimeMillis() - start).toDouble / 1000
-    }
-
-    val timeWithoutLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where id like '% test1 %'
-      """.stripMargin).show()
-
-    }
-
-    val timeWithLuceneIndex = time {
-
-      spark.sql(

Review comment:
       why this code removed?

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}

Review comment:
       the import order doesn't comply to scalastyle rules, please correct it and I wonder how the CI is passing with style errors, please check the new CI and correct it and make sure to check all the new fils once for style issues, now it will directly give the errors in red color

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -47,16 +45,13 @@ case class BroadCastPolygonFilterPushJoin(
     leftKeys: Seq[Expression],
     rightKeys: Seq[Expression],
     joinType: JoinType,
-    buildSide: BuildSide,
     condition: Option[Expression],
     left: SparkPlan,
     right: SparkPlan
-) extends BinaryExecNode with HashJoin {

Review comment:
       why we removed `HashJoin` here? 

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
##########
@@ -704,5 +705,13 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     columnSchema
   }
 
+  override def clone(): LogicalPlan = {
+    CarbonCreateSecondaryIndexCommand(indexModel,

Review comment:
       why this is required?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
##########
@@ -501,40 +321,6 @@ object CarbonSparkSqlParserUtil {
     }
   }
 
-  /**
-   * Validates the partition columns and return's A tuple of partition columns and partitioner
-   * fields.
-   *
-   * @param partitionColumns        An instance of ColTypeListContext having parser rules for
-   *                                column.
-   * @param colNames                <Seq[String]> Sequence of Table column names.
-   * @param tableProperties         <Map[String, String]> Table property map.
-   * @param partitionByStructFields Seq[StructField] Sequence of partition fields.
-   * @return <Seq[PartitionerField]> A Seq of partitioner fields.
-   */
-  def validatePartitionFields(

Review comment:
       this method and above create table looks same in both 3.1 and 2.4, is small diff, we will do proper refactoring, if common to 2.4 and 3.1, as give comment in one of the previous comments, we will add a common package and do a clean refactor

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
##########
@@ -604,11 +605,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
 
   protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] =
-    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
-    ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
-      case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~ ident ~

Review comment:
       please update the ddl in document and add an example with comment also and add test cases with comment. Now we change comment from properties

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -185,8 +224,390 @@ object CarbonToSparkAdapter {
   def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = {
     sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }

Review comment:
       in these, many methods i can see duplicate to 2.4,please refactor as gave comment before

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/util/CreateTableCommonUtil.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+
+object CreateTableCommonUtil {
+
+  def getNewTable(sparkSession: SparkSession, sessionState: SessionState,

Review comment:
       add comment for method and rename to getCatalogtable

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala
##########
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession}

Review comment:
       revert if not required

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -14,15 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql.execution
 
-package org.apache.carbondata.spark.adapter
+import org.apache.spark.sql.execution.joins.HashJoin
 
-import scala.collection.mutable.ArrayBuffer
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {

Review comment:
       add comment for trait, why its required. Since here already extends Hashjoin, I think implementation no need to add with hashjoin, please check once

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+    val sessionState = sparkSession.sessionState
+    if (sessionState.catalog.tableExists(table.identifier)) {
+      if (ignoreIfExists) {
+        return Seq.empty[Row]
+      } else {
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+      }
+    }
+    val newTable: CatalogTable = getNewTable(sparkSession, sessionState, table, LOGGER)
+
+    // We will return Nil or throw exception at the beginning if the table already exists, so when
+    // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
+    sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
+    Seq.empty[Row]
+  }
+}
+
+object CreateDataSourceTableCommand {

Review comment:
       i think this can be combined with 2.3

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
##########
@@ -214,9 +214,7 @@ class CarbonSIRebuildRDD[K, V](
           new SparkDataTypeConverterImpl)
 
         // add task completion listener to clean up the resources
-        context.addTaskCompletionListener { _ =>
-          close()
-        }
+        CarbonToSparkAdapter.addTaskCompletionListener(close())

Review comment:
       same as old comment

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.execution.joins.HashJoin
+
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {

Review comment:
       Same comment as this class in 2.3, also please make common package and keep there as no change in 2.3 and 2.4

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+
+object CreateDataSourceTableCommand {

Review comment:
       remove this from here and add to `CarbonToSparkAdapter` similar to other profiles

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf

Review comment:
       this is same for 2.3 and 2.4, please move to common package

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonToSparkAdapter

Review comment:
       this is same as 2.3, please refactor and move to common

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+

Review comment:
       same as 2.3, please move to common

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro

Review comment:
       this is same code as 2.4, please move to common package

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.execution.joins.HashJoin
+
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {
+
+  // TODO: Spark has started supporting Codegen for Join, Carbon needs to implement the same.
+  override def supportCodegen: Boolean = false

Review comment:
       can create a subjira under this and track it

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    if (sessionState.catalog.tableExists(table.identifier)) {
+      if (ignoreIfExists) {
+        return Seq.empty[Row]
+      } else {
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+      }

Review comment:
       i can see same code for 2.4 and 3.1 please refactor and add extra line at end of file, its a style issue, please check why new CI is not finding it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org