You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:12 UTC

[4/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
deleted file mode 100644
index dfb89fd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.CarbonReflectionUtils
-
-class CarbonAnalyzer(catalog: SessionCatalog,
-    conf: SQLConf,
-    sparkSession: SparkSession,
-    analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
-  val mvPlan = try {
-    CarbonReflectionUtils.createObject(
-      "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
-      sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
-  } catch {
-    case e: Exception =>
-      null
-  }
-
-  override def execute(plan: LogicalPlan): LogicalPlan = {
-    var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
-    logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
-    if (mvPlan != null) {
-      mvPlan.apply(logicalPlan)
-    } else {
-      logicalPlan
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
deleted file mode 100644
index ba6aae5..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class InMemorySessionCatalog(
-    externalCatalog: ExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends SessionCatalog(
-    externalCatalog,
-    globalTempViewManager,
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  override def alterTableRename(oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String): Unit = {
-    sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
-  }
-
-  override def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    // NOt Required in case of In-memory catalog
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val structType = catalogTable.schema
-    var newStructType = structType
-    newColumns.get.foreach {cols =>
-      newStructType = structType
-        .add(cols.getColumnName,
-          CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(cols.getDataType))
-    }
-    alterSchema(newStructType, catalogTable, tableIdentifier)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val fields = catalogTable.schema.fields.filterNot { field =>
-      dropCols.get.exists { col =>
-        col.getColumnName.equalsIgnoreCase(field.name)
-      }
-    }
-    alterSchema(new StructType(fields), catalogTable, tableIdentifier)
-  }
-
-  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
-    val a = catalogTable.schema.fields.flatMap { field =>
-      columns.get.map { col =>
-        if (col.getColumnName.equalsIgnoreCase(field.name)) {
-          StructField(col.getColumnName,
-            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(col.getDataType))
-        } else {
-          field
-        }
-      }
-    }
-    alterSchema(new StructType(a), catalogTable, tableIdentifier)
-  }
-
-  private def alterSchema(structType: StructType,
-      catalogTable: CatalogTable,
-      tableIdentifier: TableIdentifier): Unit = {
-    val copy = catalogTable.copy(schema = structType)
-    sparkSession.sessionState.catalog.alterTable(copy)
-    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-  }
-
-  lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-  def getThriftTableInfo(tablePath: String): TableInfo = {
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    CarbonUtil.readSchemaFile(tableMetadataFile)
-  }
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
-    if (isRelationRefreshed) {
-      super.lookupRelation(name)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    null
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
-class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends SessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule,
-    new CarbonLateDecodeRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override protected lazy val catalog: InMemorySessionCatalog = {
-    val catalog = new InMemorySessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: ExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
-
-  override protected lazy val resourceLoader: SessionResourceLoader = {
-    new SessionResourceLoader(session)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
-    new Analyzer(catalog, conf) {
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        new CarbonIUDAnalysisRule(sparkSession) +:
-        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-        PreWriteCheck :: HiveOnlyCheck :: Nil
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        customPostHocResolutionRules
-    }
-  )
-  override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index a046763..2f9ad3e 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
+import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
 
 
@@ -37,41 +35,3 @@ class CarbonOptimizer(
     super.execute(transFormedPlan)
   }
 }
-
-object CarbonOptimizerUtil {
-  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
-    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
-    // optimize whole plan at once.
-    val transFormedPlan = plan.transform {
-      case filter: Filter =>
-        filter.transformExpressions {
-          case s: ScalarSubquery =>
-            val tPlan = s.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            ScalarSubquery(tPlan, s.children, s.exprId)
-          case e: Exists =>
-            val tPlan = e.plan.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
-          case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
-            val tPlan = sub.transform {
-              case lr: LogicalRelation
-                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
-                lr
-            }
-            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
-        }
-    }
-    transFormedPlan
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index a7a255e..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
-    new HiveMetastoreCatalog(sparkSession),
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    val rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
-    if (isRelationRefreshed) {
-      super.lookupRelation(name)
-    } else {
-      rtnRelation
-    }
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
-      .asInstanceOf[HiveExternalCatalog].client
-  }
-
-  def alterTableRename(oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String): Unit = {
-    getClient().runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
-    getClient().runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
-  }
-
-  override def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    getClient()
-      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
-  : Unit = {
-    alterTable(tableIdentifier, schemaParts, cols)
-  }
-
-  override def createPartitions(
-      tableName: TableIdentifier,
-      parts: Seq[CatalogTablePartition],
-      ignoreIfExists: Boolean): Unit = {
-    try {
-      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
-      super.createPartitions(tableName, updatedParts, ignoreIfExists)
-    } catch {
-      case e: Exception =>
-        super.createPartitions(tableName, parts, ignoreIfExists)
-    }
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier) = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends HiveSessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-        new CarbonLateDecodeStrategy,
-        new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule,
-    new CarbonLateDecodeRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  /**
-   * Create a [[CarbonSessionStateBuilder]].
-   */
-  override protected lazy val catalog: CarbonHiveSessionCatalog = {
-    val catalog = new CarbonHiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
-  override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
-    new Analyzer(catalog, conf) {
-
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new ResolveHiveSerdeTable(session) +:
-        new FindDataSourceTable(session) +:
-        new ResolveSQLOnFile(session) +:
-        new CarbonIUDAnalysisRule(sparkSession) +:
-        new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-
-      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
-      PreWriteCheck :: HiveOnlyCheck :: Nil
-
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        new DetermineTableStats(session) +:
-        RelationConversions(conf, catalog) +:
-        PreprocessTableCreation(session) +:
-        PreprocessTableInsertion(conf) +:
-        DataSourceAnalysis(conf) +:
-        HiveAnalysis +:
-        customPostHocResolutionRules
-    }
-  )
-
-  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
deleted file mode 100644
index ac702ea..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.util.CarbonReflectionUtils
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * This class refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table.
- */
-object CarbonSessionUtil {
-
-  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
-
-  /**
-   * The method refreshes the cache entry
-   *
-   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
-   * @param name        tableName
-   * @param sparkSession
-   * @return
-   */
-  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    var isRelationRefreshed = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
-      ) =>
-        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case SubqueryAlias(_, relation) if
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
-      relation.getClass.getName
-        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
-      relation.getClass.getName.equals(
-        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
-      ) =>
-        val catalogTable =
-          CarbonReflectionUtils.getFieldOfCatalogTable(
-            "tableMeta",
-            relation
-          ).asInstanceOf[CatalogTable]
-        isRelationRefreshed =
-          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
-      case _ =>
-    }
-    isRelationRefreshed
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   *
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
-      allPartitions,
-      partitionFilters,
-      sparkSession.sessionState.conf.sessionLocalTimeZone
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index b8bf56d..7177f1a 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -18,24 +18,15 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
 
 class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
-  extends SparkSqlAstBuilder(conf) {
+  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
 
   val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
@@ -55,75 +46,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     }
   }
 
-  override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
-    val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
-    }
-
-    val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
-      case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
-      case _ => (newColumn.dataType.typeName.toLowerCase, None)
-    }
-
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
-        new CarbonSpark2SqlParser()
-          .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
-        ctx.tableIdentifier().table.getText.toLowerCase,
-        ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
-
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
-  }
-
-
   override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-    val fields = parser.getFields(cols)
-    val tblProperties = scala.collection.mutable.Map.empty[String, String]
-    val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
-      new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
-        .map(_.getText)),
-      ctx.tableIdentifier.table.getText.toLowerCase,
-      fields,
-      Seq.empty,
-      tblProperties,
-      None,
-      true)
-
-    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
-      Option(ctx.tableIdentifier().db).map(_.getText),
-      ctx.tableIdentifier.table.getText,
-      tblProperties.toMap,
-      tableModel.dimCols,
-      tableModel.msrCols,
-      tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
-    CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
-  }
-
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    super.visitCreateTable(ctx)
-  }
-
-  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
-    withOrigin(ctx) {
-      if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
-          CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
-        super.visitShowTables(ctx)
-      } else {
-        CarbonShowTablesCommand(
-          Option(ctx.db).map(_.getText),
-          Option(ctx.pattern).map(string))
-      }
-    }
-  }
-
-  override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
-    CarbonExplainCommand(super.visitExplain(ctx))
+    visitAddTableColumns(parser,ctx)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
deleted file mode 100644
index 2128ffd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.buildConf
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
-  val carbonProperties = CarbonProperties.getInstance()
-
-  /**
-   * To initialize dynamic param defaults along with usage docs
-   */
-  def addDefaultCarbonParams(): Unit = {
-    val ENABLE_UNSAFE_SORT =
-      buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
-        .doc("To enable/ disable unsafe sort.")
-        .booleanConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
-      buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To set carbon task distribution.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    val BAD_RECORDS_LOGGER_ENABLE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
-        .doc("To enable/ disable carbon bad record logger.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants
-          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    val BAD_RECORDS_ACTION =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-        .doc("To configure the bad records action.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    val IS_EMPTY_DATA_BAD_RECORD =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
-        .doc("Property to decide weather empty data to be considered bad/ good record.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
-          .toBoolean)
-    val SORT_SCOPE =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
-        .doc("Property to specify sort scope.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    val BATCH_SORT_SIZE_INMB =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
-        .doc("Property to specify batch sort size in MB.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    val SINGLE_PASS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
-        .doc("Property to enable/disable single_pass.")
-        .booleanConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    val BAD_RECORD_PATH =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
-        .doc("Property to configure the bad record location.")
-        .stringConf
-        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    val GLOBAL_SORT_PARTITIONS =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
-        .doc("Property to configure the global sort partitions.")
-        .stringConf
-        .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    val DATEFORMAT =
-      buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-        .doc("Property to configure data format for date type columns.")
-        .stringConf
-        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val CARBON_INPUT_SEGMENTS = buildConf(
-      "carbon.input.segments.<database_name>.<table_name>")
-      .doc("Property to configure the list of segments to query.").stringConf
-      .createWithDefault(carbonProperties
-        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
-  }
-  /**
-   * to set the dynamic properties default values
-   */
-  def addDefaultCarbonSessionParams(): Unit = {
-    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
-    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-      carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
-    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index 4e22d13..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
-import org.apache.spark.sql.sources.BaseRelation
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- *
- * @param table the Catalog Table
- * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- *
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
-    table: CatalogTable,
-    mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
-
-    val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
-
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-      assert(mode != SaveMode.Overwrite,
-        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
-
-      if (mode == SaveMode.ErrorIfExists) {
-        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
-      }
-      if (mode == SaveMode.Ignore) {
-        // Since the table already exists and the save mode is Ignore, we will just return.
-        return Seq.empty
-      }
-
-      saveDataIntoTable(
-        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
-    } else {
-      assert(table.schema.isEmpty)
-
-      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-        Some(sessionState.catalog.defaultTablePath(table.identifier))
-      } else {
-        table.storage.locationUri
-      }
-      val result = saveDataIntoTable(
-        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
-
-      result match {
-        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
-          // Need to recover partitions into the metastore so our saved data is visible.
-          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
-        case _ =>
-      }
-    }
-
-    Seq.empty[Row]
-  }
-
-  private def saveDataIntoTable(
-      session: SparkSession,
-      table: CatalogTable,
-      tableLocation: Option[URI],
-      data: LogicalPlan,
-      mode: SaveMode,
-      tableExists: Boolean): BaseRelation = {
-    // Create the relation based on the input logical plan: `data`.
-    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
-    val dataSource = DataSource(
-      session,
-      className = table.provider.get,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = if (tableExists) {
-        Some(table)
-      } else {
-        None
-      })
-
-    try {
-      dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
-        throw ex
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..cec4c36
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+  def addSparkListener(sparkContext: SparkContext) = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+                               metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+                               attrRef : NamedExpression): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def createAliasRef(child: Expression,
+                     name: String,
+                     exprId: ExprId = NamedExpression.newExprId,
+                     qualifier: Option[String] = None,
+                     explicitMetadata: Option[Metadata] = None,
+                     namedExpr : Option[NamedExpression] = None ) : Alias = {
+
+      Alias(child, name)(exprId, qualifier, explicitMetadata)
+  }
+
+  def getExplainCommandObj() : ExplainCommand = {
+    ExplainCommand(OneRowRelation())
+  }
+
+  /**
+   * As a part of SPARK-24085 Hive tables supports scala subquery for
+   * parition tables,so Carbon also needs to supports
+   * @param partitionSet
+   * @param filterPredicates
+   * @return
+   */
+  def getPartitionKeyFilter(
+      partitionSet: AttributeSet,
+      filterPredicates: Seq[Expression]): ExpressionSet = {
+    ExpressionSet(
+      ExpressionSet(filterPredicates)
+        .filterNot(SubqueryExpression.hasSubquery)
+        .filter(_.references.subsetOf(partitionSet)))
+  }
+
+  // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+  def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 783a528..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.ColumnVectorFactory;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
-    private ColumnarBatch columnarBatch;
-    private WritableColumnVector[] writableColumnVectors;
-
-    /**
-     * Adapter class which handles the columnar vector reading of the carbondata
-     * based on the spark ColumnVector and ColumnarBatch API. This proxy class
-     * handles the complexity of spark 2.3 version related api changes since
-     * spark ColumnVector and ColumnarBatch interfaces are still evolving.
-     *
-     * @param memMode       which represent the type onheap or offheap vector.
-     * @param rowNum        rows number for vector reading
-     * @param structFileds, metadata related to current schema of table.
-     */
-    public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        writableColumnVectors = ColumnVectorFactory
-                .getColumnVector(memMode, new StructType(structFileds), rowNum);
-        columnarBatch = new ColumnarBatch(writableColumnVectors);
-        columnarBatch.setNumRows(rowNum);
-    }
-
-    public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        writableColumnVectors = ColumnVectorFactory
-                .getColumnVector(memMode, outputSchema, rowNum);
-        columnarBatch = new ColumnarBatch(writableColumnVectors);
-        columnarBatch.setNumRows(rowNum);
-    }
-
-    /**
-     * Returns the number of rows for read, including filtered rows.
-     */
-    public int numRows() {
-        return columnarBatch.numRows();
-    }
-
-    public Object reserveDictionaryIds(int capacity, int ordinal) {
-        return writableColumnVectors[ordinal].reserveDictionaryIds(capacity);
-    }
-
-    /**
-     * This API will return a columnvector from a batch of column vector rows
-     * based on the ordinal
-     *
-     * @param ordinal
-     * @return
-     */
-    public ColumnVector column(int ordinal) {
-        return columnarBatch.column(ordinal);
-    }
-
-    /**
-     * Resets this column for writing. The currently stored values are no longer accessible.
-     */
-    public void reset() {
-        for (WritableColumnVector col : writableColumnVectors) {
-            col.reset();
-        }
-    }
-
-    public void resetDictionaryIds(int ordinal) {
-        writableColumnVectors[ordinal].getDictionaryIds().reset();
-    }
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public InternalRow getRow(int rowId) {
-        return columnarBatch.getRow(rowId);
-    }
-
-
-    /**
-     * Returns the row in this batch at `rowId`. Returned row is reused across calls.
-     */
-    public Object getColumnarBatch() {
-        return columnarBatch;
-    }
-
-    /**
-     * Called to close all the columns in this batch. It is not valid to access the data after
-     * calling this. This must be called at the end to clean up memory allocations.
-     */
-    public void close() {
-        columnarBatch.close();
-    }
-
-    /**
-     * Sets the number of rows in this batch.
-     */
-    public void setNumRows(int numRows) {
-        columnarBatch.setNumRows(numRows);
-    }
-
-    /**
-     * This method will add the row to the corresponding column vector object
-     *
-     * @param rowId
-     * @param value
-     */
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-            org.apache.spark.sql.types.DataType t = dataType(offset);
-            if (null == value) {
-                putNull(rowId, offset);
-            } else {
-                if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                    putBoolean(rowId, (boolean) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                    putByte(rowId, (byte) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                    putShort(rowId, (short) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                    putInt(rowId, (int) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                    putLong(rowId, (long) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                    putFloat(rowId, (float) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                    putDouble(rowId, (double) value, offset);
-                } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                    UTF8String v = (UTF8String) value;
-                    putByteArray(rowId, v.getBytes(), offset);
-                } else if (t instanceof DecimalType) {
-                    DecimalType dt = (DecimalType) t;
-                    Decimal d = Decimal.fromDecimal(value);
-                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                        putInt(rowId, (int) d.toUnscaledLong(), offset);
-                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                        putLong(rowId, d.toUnscaledLong(), offset);
-                    } else {
-                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                        byte[] bytes = integer.toByteArray();
-                        putByteArray(rowId, bytes, 0, bytes.length, offset);
-                    }
-                } else if (t instanceof CalendarIntervalType) {
-                    CalendarInterval c = (CalendarInterval) value;
-                    writableColumnVectors[offset].getChild(0).putInt(rowId, c.months);
-                    writableColumnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
-                } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                    putInt(rowId, (int) value, offset);
-                } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                    putLong(rowId, (long) value, offset);
-                }
-            }
-    }
-
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        writableColumnVectors[ordinal].putBoolean(rowId, (boolean) value);
-    }
-
-    public void putByte(int rowId, byte value, int ordinal) {
-        writableColumnVectors[ordinal].putByte(rowId, (byte) value);
-    }
-
-    public void putShort(int rowId, short value, int ordinal) {
-        writableColumnVectors[ordinal].putShort(rowId, (short) value);
-    }
-
-    public void putInt(int rowId, int value, int ordinal) {
-        writableColumnVectors[ordinal].putInt(rowId, (int) value);
-    }
-
-    public void putFloat(int rowId, float value, int ordinal) {
-        writableColumnVectors[ordinal].putFloat(rowId, (float) value);
-    }
-
-    public void putLong(int rowId, long value, int ordinal) {
-        writableColumnVectors[ordinal].putLong(rowId, (long) value);
-    }
-
-    public void putDouble(int rowId, double value, int ordinal) {
-        writableColumnVectors[ordinal].putDouble(rowId, (double) value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value);
-    }
-
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        writableColumnVectors[ordinal].putInts(rowId, count, value);
-    }
-
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        writableColumnVectors[ordinal].putShorts(rowId, count, value);
-    }
-
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        writableColumnVectors[ordinal].putLongs(rowId, count, value);
-    }
-
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        writableColumnVectors[ordinal].putDecimal(rowId, value, precision);
-
-    }
-
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        writableColumnVectors[ordinal].putDoubles(rowId, count, value);
-    }
-
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        writableColumnVectors[ordinal].putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        writableColumnVectors[ordinal].putNulls(rowId, count);
-    }
-
-    public boolean isNullAt(int rowId, int ordinal) {
-        return writableColumnVectors[ordinal].isNullAt(rowId);
-    }
-
-    public DataType dataType(int ordinal) {
-        return writableColumnVectors[ordinal].dataType();
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
deleted file mode 100644
index 6fe5ede..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-public class ColumnVectorFactory {
-
-
-    public static WritableColumnVector[] getColumnVector(MemoryMode memMode, StructType outputSchema, int rowNums) {
-
-
-        WritableColumnVector[] writableColumnVectors = null;
-        switch (memMode) {
-            case ON_HEAP:
-                writableColumnVectors = OnHeapColumnVector
-                        .allocateColumns(rowNums, outputSchema);
-                break;
-            case OFF_HEAP:
-                writableColumnVectors = OffHeapColumnVector
-                        .allocateColumns(rowNums, outputSchema);
-                break;
-        }
-        return writableColumnVectors;
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
new file mode 100644
index 0000000..f1b632b
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.internal.SQLConf
+
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
new file mode 100644
index 0000000..73b6790
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
+
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+    val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat(0))
+
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("carbondata") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
+        ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
+        Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+      helper.createCarbonTable(createTableTuple)
+    } else {
+      super.visitCreateHiveTable(ctx)
+    }
+  }
+
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+    visitAddTableColumns(parser,ctx)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
new file mode 100644
index 0000000..f357e41
--- /dev/null
+++ b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.streaming.CarbonStreamInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamRecordReaderTest extends TestCase {
+
+  private TaskAttemptID taskAttemptId;
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration hadoopConf;
+  private AbsoluteTableIdentifier identifier;
+  private String tablePath;
+
+
+  @Override protected void setUp() throws Exception {
+    tablePath = new File("target/stream_input").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_input";
+    identifier = AbsoluteTableIdentifier.from(
+        tablePath,
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+  }
+
+  private InputSplit buildInputSplit() throws IOException {
+    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+    List<CarbonInputSplit> splitList = new ArrayList<>();
+    splitList.add(carbonInputSplit);
+    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
+        FileFormat.ROW_V1);
+  }
+
+  @Test public void testCreateRecordReader() {
+    try {
+      InputSplit inputSplit = buildInputSplit();
+      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+      Assert.assertNotNull("Failed to create record reader", recordReader);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
index cdcf018..466475b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
@@ -80,7 +80,7 @@ class BooleanDataTypesFilterTest extends QueryTest with BeforeAndAfterEach with
     checkAnswer(sql("select count(*) from carbon_table where booleanField = true"),
       Row(4))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from carbon_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
index c220f1c..4050fd8 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -283,7 +283,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 
@@ -373,7 +373,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
     checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
       Row(10))
 
-    if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
       checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
         Row(0))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 363db65..8aa0cf6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.spark.carbondata.bucketing
 
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.Exchange
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -128,7 +128,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
   }
@@ -146,7 +150,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
   }
@@ -171,7 +179,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
     sql("DROP TABLE bucketed_parquet_table")
@@ -196,7 +208,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       """.stripMargin).queryExecution.executedPlan
     var shuffleExists = false
     plan.collect {
-      case s: ShuffleExchange => shuffleExists = true
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
     sql("DROP TABLE parquet_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index 66cf675..089dfd2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.carbondata.query
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.scalatest.BeforeAndAfterAll