You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/10/19 10:33:09 UTC

[4/9] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
deleted file mode 100644
index cd42fba..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.SqlParser
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-
-/**
- * This class contains all carbon hive metadata related utilities
- */
-object CarbonHiveMetadataUtil {
-
-  @transient
-  val LOGGER = LogServiceFactory.getLogService(CarbonHiveMetadataUtil.getClass.getName)
-
-
-  /**
-   * This method invalidates the table from HiveMetastoreCatalog before dropping table
-   *
-   * @param databaseName
-   * @param tableName
-   * @param sqlContext
-   */
-  def invalidateAndDropTable(databaseName: String,
-      tableName: String,
-      sqlContext: SQLContext): Unit = {
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    val tableWithDb = databaseName + "." + tableName
-    val tableIdent = SqlParser.parseTableIdentifier(tableWithDb)
-    try {
-      hiveContext.catalog.invalidateTable(tableIdent)
-      hiveContext.runSqlHive(s"DROP TABLE IF EXISTS $databaseName.$tableName")
-    } catch {
-      case e: Exception =>
-        LOGGER.audit(
-          s"Error While deleting the table $databaseName.$tableName during drop carbon table" +
-          e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
deleted file mode 100644
index 4d5d39a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io._
-import java.util.UUID
-
-import scala.Array.canBuildFrom
-import scala.collection.mutable.ArrayBuffer
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.RegexParsers
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.client.ClientInterface
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.reader.ThriftReader
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.ThriftWriter
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
-
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
-
-case class CarbonMetaData(dims: Seq[String],
-    msrs: Seq[String],
-    carbonTable: CarbonTable,
-    dictionaryMap: DictionaryMap)
-
-object CarbonMetastore {
-
-  def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
-    val createTBase = new ThriftReader.TBaseCreator() {
-      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-        new TableInfo()
-      }
-    }
-    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
-    var tableInfo: TableInfo = null
-    try {
-      thriftReader.open()
-      tableInfo = thriftReader.read().asInstanceOf[TableInfo]
-    } finally {
-      thriftReader.close()
-    }
-    tableInfo
-  }
-
-  def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    try {
-      thriftWriter.open()
-      thriftWriter.write(tableInfo);
-    } finally {
-      thriftWriter.close()
-    }
-  }
-
-}
-
-case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
-  def get(name: String): Option[Boolean] = {
-    dictionaryMap.get(name.toLowerCase)
-  }
-}
-
-class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
-    client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
-
-  @transient
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
-
-  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
-  tableModifiedTimeStore
-    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
-
-  val metadata = loadMetadata(storePath)
-
-  def getTableCreationTime(databaseName: String, tableName: String): Long = {
-    val tableMeta = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-    val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
-    tableCreationTime
-  }
-
-  def lookupRelation1(dbName: Option[String],
-      tableName: String)(sqlContext: SQLContext): LogicalPlan = {
-    lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
-  }
-
-  def lookupRelation1(tableIdentifier: TableIdentifier,
-      alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tables = getTableFromMetadata(database, tableIdentifier.table)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
-      case None =>
-        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
-        throw new NoSuchTableException
-    }
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  def getTableFromMetadata(database: String,
-      tableName: String): Option[TableMeta] = {
-    metadata.tablesMeta
-      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
-  }
-
-  def tableExists(identifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = identifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(identifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
-    }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
-  }
-
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
-                  val createTBase = new ThriftReader.TBaseCreator() {
-                    override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
-                      new TableInfo()
-                    }
-                  }
-                  val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
-                  thriftReader.open()
-                  val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
-                  thriftReader.close()
-
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    null, carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-    }
-  }
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableinfo
-   *
-   */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String, partitioner: Partitioner)
-    (sqlContext: SQLContext): String = {
-    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val thriftTableInfo = schemaConverter
-      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
-
-    val fileType = FileFactory.getFileType(schemaMetadataPath)
-    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType)
-    }
-
-    val thriftWriter = new ThriftWriter(schemaFilePath, false)
-    thriftWriter.open()
-    thriftWriter.write(thriftTableInfo)
-    thriftWriter.close()
-
-    metadata.tablesMeta += tableMeta
-    logInfo(s"Table $tableName for Database $dbName created successfully.")
-    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
-  }
-
-  private def updateMetadataByWrapperTable(
-      wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
-
-    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.tablesMeta.indices) {
-      if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
-        metadata.tablesMeta(i).carbonTable = carbonTable
-      }
-    }
-  }
-
-  def updateMetadataByThriftTable(schemaFilePath: String,
-      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
-
-    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-      .setTime_stamp(System.currentTimeMillis())
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
-    wrapperTableInfo
-      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setStorePath(storePath)
-    updateMetadataByWrapperTable(wrapperTableInfo)
-  }
-
-  /**
-   * Shows all tables for given schema.
-   */
-  def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
-
-    val dbName =
-      databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
-    checkSchemasModifiedTimeAndReloadTables()
-    metadata.tablesMeta.filter { c =>
-      c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
-    }.map { c => (c.carbonTableIdentifier.getTableName, false) }
-  }
-
-  def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tableName = tableIdentifier.table
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
-  }
-
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
-    (sqlContext: SQLContext) {
-    val dbName = tableIdentifier.database.get
-    val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
-
-    val fileType = FileFactory.getFileType(metadataFilePath)
-
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      // while drop we should refresh the schema modified time so that if any thing has changed
-      // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
-      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
-      // discard cached table info in cachedDataSourceTables
-      sqlContext.catalog.refreshTable(tableIdentifier)
-      DataMapStoreManager.getInstance().
-        clearDataMap(AbsoluteTableIdentifier.from(storePath, dbName, tableName))
-    }
-  }
-
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    val timestampFileType = FileFactory.getFileType(timestampFile)
-    (timestampFile, timestampFileType)
-  }
-
-  /**
-   * This method will put the updated timestamp of schema file in the table modified time store map
-   *
-   * @param timeStamp
-   */
-  def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put("default", timeStamp)
-  }
-
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
-
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
-      FileFactory.createNewFile(timestampFile, timestampFileType)
-    }
-    val systemTime = System.currentTimeMillis()
-    FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(systemTime)
-    systemTime
-  }
-
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
-        getLastModifiedTime ==
-            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
-        refreshCache()
-      }
-    }
-  }
-
-  def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath).tablesMeta
-  }
-
-  def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
-    var schemaLastUpdatedTime = System.currentTimeMillis
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
-        .getLastModifiedTime
-    }
-    schemaLastUpdatedTime
-  }
-
-  def createDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName
-    val fileType = FileFactory.getFileType(databasePath)
-    FileFactory.mkdirs(databasePath, fileType)
-  }
-
-  def dropDatabaseDirectory(dbName: String) {
-    val databasePath = storePath + File.separator + dbName
-    val fileType = FileFactory.getFileType(databasePath)
-    if (FileFactory.isFileExist(databasePath, fileType)) {
-      val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
-      CarbonUtil.deleteFoldersAndFiles(dbPath)
-    }
-  }
-
-}
-
-
-object CarbonMetastoreTypes extends RegexParsers {
-  protected lazy val primitiveType: Parser[DataType] =
-    "string" ^^^ StringType |
-    "float" ^^^ FloatType |
-    "int" ^^^ IntegerType |
-    "tinyint" ^^^ ShortType |
-    "short" ^^^ ShortType |
-    "double" ^^^ DoubleType |
-    "long" ^^^ LongType |
-    "binary" ^^^ BinaryType |
-    "boolean" ^^^ BooleanType |
-    fixedDecimalType |
-    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
-    "varchar\\((\\d+)\\)".r ^^^ StringType |
-    "timestamp" ^^^ TimestampType |
-    "date" ^^^ DateType |
-    "char\\((\\d+)\\)".r ^^^ StringType
-
-  protected lazy val fixedDecimalType: Parser[DataType] =
-    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
-      case precision ~ scale =>
-        DecimalType(precision.toInt, scale.toInt)
-    }
-
-  protected lazy val arrayType: Parser[DataType] =
-    "array" ~> "<" ~> dataType <~ ">" ^^ {
-      case tpe => ArrayType(tpe)
-    }
-
-  protected lazy val mapType: Parser[DataType] =
-    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
-      case t1 ~ _ ~ t2 => MapType(t1, t2)
-    }
-
-  protected lazy val structField: Parser[StructField] =
-    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
-      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
-    }
-
-  protected lazy val structType: Parser[DataType] =
-    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
-      case fields => StructType(fields)
-    }
-
-  protected lazy val dataType: Parser[DataType] =
-    arrayType |
-    mapType |
-    structType |
-    primitiveType
-
-  def toDataType(metastoreType: String): DataType = {
-    parseAll(dataType, metastoreType) match {
-      case Success(result, _) => result
-      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
-    }
-  }
-
-  def toMetastoreType(dt: DataType): String = {
-    dt match {
-      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
-      case StructType(fields) =>
-        s"struct<${
-          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
-            .mkString(",")
-        }>"
-      case StringType => "string"
-      case FloatType => "float"
-      case IntegerType => "int"
-      case ShortType => "tinyint"
-      case DoubleType => "double"
-      case LongType => "bigint"
-      case BinaryType => "binary"
-      case BooleanType => "boolean"
-      case DecimalType() => "decimal"
-      case DateType => "date"
-      case TimestampType => "timestamp"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
deleted file mode 100644
index 82c5f7f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.CarbonSqlParser
-import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-private[spark] class CarbonSQLDialect(hiveContext: HiveContext) extends ParserDialect {
-
-  @transient
-  protected val sqlParser = new CarbonSqlParser
-
-  override def parse(sqlText: String): LogicalPlan = {
-
-    try {
-      sqlParser.parse(sqlText)
-    } catch {
-      // MalformedCarbonCommandException need to throw directly
-      // because hive can no parse carbon command
-      case ce: MalformedCarbonCommandException =>
-        throw ce
-      case _: Throwable =>
-        HiveQl.parseSql(sqlText)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
deleted file mode 100644
index d3d699a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ScalaUDF, _}
-import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
-import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
-import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
-
-  override def strategies: Seq[Strategy] = getStrategies
-
-  val LOGGER = LogServiceFactory.getLogService("CarbonStrategies")
-
-  def getStrategies: Seq[Strategy] = {
-    val total = sqlContext.planner.strategies :+ CarbonTableScan
-    total
-  }
-
-  /**
-   * Carbon strategies for performing late materizlization (decoding dictionary key
-   * as late as possbile)
-   */
-  private[sql] object CarbonTableScan extends Strategy {
-
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-      plan match {
-        case PhysicalOperation(projectList, predicates, l: LogicalRelation)
-          if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-          if (isStarQuery(plan)) {
-            carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
-          } else {
-            carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
-          }
-        case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
-        _, child: LogicalPlan, overwrite, _) =>
-          ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
-        case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-          CarbonDictionaryDecoder(relations,
-            profile,
-            aliasMap,
-            planLater(child))(sqlContext) :: Nil
-        case _ =>
-          Nil
-      }
-    }
-
-    /**
-     * Create carbon scan
-     */
-    private def carbonRawScan(projectListRaw: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-
-      val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
-      val tableName: String =
-        relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-      // Check out any expressions are there in project list. if they are present then we need to
-      // decode them as well.
-
-      val projectList = projectListRaw.map {p =>
-        p.transform {
-          case CustomDeterministicExpression(exp) => exp
-        }
-      }.asInstanceOf[Seq[NamedExpression]]
-      val newProjectList = projectList.map {
-        case a@Alias(s: ScalaUDF, name)
-          if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-             name.equalsIgnoreCase(
-               CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
-          AttributeReference(name, StringType, true)().withExprId(a.exprId)
-        case a@Alias(s: ScalaUDF, name)
-          if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
-          val reference =
-            AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
-              StringType, true)().withExprId(a.exprId)
-          val alias = a.transform {
-            case s: ScalaUDF =>
-              ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
-          }.asInstanceOf[Alias]
-          Alias(alias.child, alias.name)(alias.exprId, alias.qualifiers, alias.explicitMetadata)
-        case other => other
-      }
-      val projectSet = AttributeSet(newProjectList.flatMap(_.references))
-      val filterSet = AttributeSet(predicates.flatMap(_.references))
-      val scan = CarbonScan(projectSet.toSeq,
-        relation.carbonRelation,
-        predicates)(sqlContext)
-      newProjectList.map {
-        case attr: AttributeReference =>
-        case Alias(attr: AttributeReference, _) =>
-        case others =>
-          others.references.map { f =>
-            val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name)
-            if (dictionary.isDefined && dictionary.get) {
-              scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
-            }
-          }
-      }
-      val scanWithDecoder =
-        if (scan.attributesNeedToDecode.size() > 0) {
-          val decoder = getCarbonDecoder(logicalRelation,
-            sc,
-            tableName,
-            scan.attributesNeedToDecode.asScala.toSeq,
-            scan)
-          if (scan.unprocessedExprs.nonEmpty) {
-            val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-            filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
-          } else {
-            decoder
-          }
-        } else {
-          scan
-        }
-
-      if (projectList.map(_.toAttribute) == scan.columnProjection &&
-          projectSet.size == projectList.size &&
-          filterSet.subsetOf(projectSet)) {
-        // copied from spark pruneFilterProjectRaw
-        // When it is possible to just use column pruning to get the right projection and
-        // when the columns of this projection are enough to evaluate all filter conditions,
-        // just do a scan with no extra project.
-        scanWithDecoder
-      } else {
-        Project(newProjectList, scanWithDecoder)
-      }
-    }
-
-    /**
-     * Create carbon scan for star query
-     */
-    private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-      val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
-      val tableName: String =
-        relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-      // Check out any expressions are there in project list. if they are present then we need to
-      // decode them as well.
-      val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
-      val scan = CarbonScan(projectList.map(_.toAttribute),
-        relation.carbonRelation,
-        predicates,
-        useUnsafeCoversion = false)(sqlContext)
-      projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
-      val updatedAttrs = scan.columnProjection.map(attr =>
-        updateDataType(attr.asInstanceOf[AttributeReference], relation, projectExprsNeedToDecode))
-      scan.columnProjection = updatedAttrs
-      if (projectExprsNeedToDecode.size() > 0
-          && isDictionaryEncoded(projectExprsNeedToDecode.asScala.toSeq, relation)) {
-        val decoder = getCarbonDecoder(logicalRelation,
-          sc,
-          tableName,
-          projectExprsNeedToDecode.asScala.toSeq,
-          scan)
-        if (scan.unprocessedExprs.nonEmpty) {
-          val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-          filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
-        } else {
-          decoder
-        }
-      } else {
-        if (scan.unprocessedExprs.nonEmpty) {
-          val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-          filterCondToAdd.map(Filter(_, scan)).getOrElse(scan)
-        } else {
-          scan
-        }
-      }
-    }
-
-    def getCarbonDecoder(logicalRelation: LogicalRelation,
-        sc: SQLContext,
-        tableName: String,
-        projectExprsNeedToDecode: Seq[Attribute],
-        scan: CarbonScan): CarbonDictionaryDecoder = {
-      val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
-        logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
-      val attrs = projectExprsNeedToDecode.map { attr =>
-        val newAttr = AttributeReference(attr.name,
-          attr.dataType,
-          attr.nullable,
-          attr.metadata)(attr.exprId, Seq(tableName))
-        relation.addAttribute(newAttr)
-        newAttr
-      }
-      CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
-        CarbonAliasDecoderRelation(), scan)(sc)
-    }
-
-    def isDictionaryEncoded(projectExprsNeedToDecode: Seq[Attribute],
-        relation: CarbonDatasourceRelation): Boolean = {
-      var isEncoded = false
-      projectExprsNeedToDecode.foreach { attr =>
-        if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false)) {
-          isEncoded = true
-        }
-      }
-      isEncoded
-    }
-
-    def updateDataType(attr: AttributeReference,
-        relation: CarbonDatasourceRelation,
-        allAttrsNotDecode: util.Set[Attribute]): AttributeReference = {
-      if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
-          !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
-        AttributeReference(attr.name,
-          IntegerType,
-          attr.nullable,
-          attr.metadata)(attr.exprId, attr.qualifiers)
-      } else {
-        attr
-      }
-    }
-
-    private def isStarQuery(plan: LogicalPlan) = {
-      plan match {
-        case LogicalFilter(condition, l: LogicalRelation)
-          if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-          true
-        case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
-        case _ => false
-      }
-    }
-  }
-
-  object DDLStrategies extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case DropTable(tableName, ifNotExists)
-        if CarbonEnv.get.carbonMetastore
-          .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
-        val identifier = toTableIdentifier(tableName.toLowerCase)
-        ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
-      case ShowLoadsCommand(databaseName, table, limit) =>
-        ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
-      case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-      options, isOverwriteExist, inputSqlString, dataFrame, _) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
-          .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
-        if (isCarbonTable || options.nonEmpty) {
-          ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-            options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
-        } else {
-          ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
-        }
-      case alterTable@AlterTableCompaction(altertablemodel) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
-          .tableExists(TableIdentifier(altertablemodel.tableName,
-            altertablemodel.dbName))(sqlContext)
-        if (isCarbonTable) {
-          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
-              altertablemodel.compactionType.equalsIgnoreCase("major")) {
-            ExecutedCommand(alterTable) :: Nil
-          } else {
-            throw new MalformedCarbonCommandException(
-              "Unsupported alter operation on carbon table")
-          }
-        } else {
-          ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil
-        }
-      case CreateDatabase(dbName, sql) =>
-        ExecutedCommand(CreateDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil
-      case DropDatabase(dbName, isCascade, sql) =>
-        if (isCascade) {
-          ExecutedCommand(DropDatabaseCascadeCommand(dbName, HiveNativeCommand(sql))) :: Nil
-        } else {
-          ExecutedCommand(DropDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil
-        }
-      case UseDatabase(sql) =>
-        ExecutedCommand(HiveNativeCommand(sql)) :: Nil
-      case d: HiveNativeCommand =>
-        try {
-          val resolvedTable = sqlContext.executePlan(CarbonHiveSyntax.parse(d.sql)).optimizedPlan
-          planLater(resolvedTable) :: Nil
-        } catch {
-          case ce: MalformedCarbonCommandException =>
-            throw ce
-          case ae: AnalysisException =>
-            throw ae
-          case e: Exception => ExecutedCommand(d) :: Nil
-        }
-      case DescribeFormattedCommand(sql, tblIdentifier) =>
-        val isTable = CarbonEnv.get.carbonMetastore
-          .tableExists(tblIdentifier)(sqlContext)
-        if (isTable) {
-          val describe =
-            LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)
-          val resolvedTable = sqlContext.executePlan(describe.table).analyzed
-          val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan
-          ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, tblIdentifier)) :: Nil
-        } else {
-          ExecutedCommand(HiveNativeCommand(sql)) :: Nil
-        }
-      case ShowPartitions(t) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
-          .tableExists(t)(sqlContext)
-        if (isCarbonTable) {
-          ExecutedCommand(ShowCarbonPartitionsCommand(t)) :: Nil
-        } else {
-          var tableName = t.table
-          var database = t.database
-          var sql: String = null
-          if (database.isEmpty) {
-            sql = s"show partitions $tableName"
-          } else {
-            sql = s"show partitions $database.$tableName"
-          }
-          ExecutedCommand(HiveNativeCommand(sql)) :: Nil
-        }
-      case _ =>
-        Nil
-    }
-
-    def toTableIdentifier(name: String): TableIdentifier = {
-      val identifier = name.split("\\.")
-      identifier match {
-        case Array(tableName) => TableIdentifier(tableName, None)
-        case Array(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
-      }
-    }
-  }
-
-}
-
-object CarbonHiveSyntax {
-
-  @transient
-  protected val sqlParser = new CarbonSqlParser
-
-  def parse(sqlText: String): LogicalPlan = {
-    sqlParser.parse(sqlText)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
deleted file mode 100644
index 6b244f9..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.hive.ql.parse.ASTNode
-
-/**
- * Wrapper class for using the hiveQl class of hive.
- */
-object HiveQlWrapper {
-
-  def getAst(sql: String): ASTNode = {
-
-    HiveQl.getAst(sql)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
deleted file mode 100644
index ad70027..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.cli
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.CarbonContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.thriftserver.{SparkSQLCLIDriver, SparkSQLEnv}
-import org.apache.spark.util.Utils
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-object CarbonSQLCLIDriver {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  var hiveContext: HiveContext = _
-  var sparkContext: SparkContext = _
-
-  def main(args: Array[String]): Unit = {
-    init()
-    SparkSQLEnv.sparkContext = sparkContext
-    SparkSQLEnv.hiveContext = hiveContext
-    SparkSQLCLIDriver.installSignalHandler()
-    SparkSQLCLIDriver.main(args)
-  }
-
-  def init() {
-    if (hiveContext == null) {
-      val sparkConf = new SparkConf(loadDefaults = true)
-      val maybeSerializer = sparkConf.getOption("spark.serializer")
-      val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
-      // If user doesn't specify the appName, we want to get [SparkSQL::localHostName] instead of
-      // the default appName [CarbonSQLCLIDriver] in cli or beeline.
-      val maybeAppName = sparkConf
-        .getOption("spark.app.name")
-        .filterNot(_ == classOf[SparkSQLCLIDriver].getName)
-      val maybeStorePath = sparkConf.getOption("spark.carbon.storepath")
-
-      sparkConf
-        .setAppName(maybeAppName.getOrElse(s"CarbonSparkSQL::${ Utils.localHostName() }"))
-        .set(
-          "spark.serializer",
-          maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
-        .set(
-          "spark.kryo.referenceTracking",
-          maybeKryoReferenceTracking.getOrElse("false"))
-
-      sparkContext = new SparkContext(sparkConf)
-      sparkContext.addSparkListener(new StatsReportListener())
-      val path = System.getenv("CARBON_HOME") + "/bin/carbonsqlclistore"
-      val store = new File(path)
-      store.mkdirs()
-      hiveContext = new CarbonContext(sparkContext,
-        maybeStorePath.getOrElse(store.getCanonicalPath),
-        store.getCanonicalPath)
-
-      hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
-      hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) =>
-        LOGGER.debug(s"HiveConf var: $k=$v")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
deleted file mode 100644
index 0f42940..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution.command
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.command.DropTableCommand
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
-
-private[hive] case class CreateDatabaseCommand(dbName: String,
-    command: HiveNativeCommand) extends RunnableCommand {
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val rows = command.run(sqlContext)
-    CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
-    rows
-  }
-}
-
-private[hive] case class DropDatabaseCommand(dbName: String,
-    command: HiveNativeCommand) extends RunnableCommand {
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val rows = command.run(sqlContext)
-    CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
-    rows
-  }
-}
-
-private[hive] case class DropDatabaseCascadeCommand(dbName: String,
-    command: HiveNativeCommand) extends RunnableCommand {
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val tablesInDB = CarbonEnv.get.carbonMetastore
-        .getTables(Some(dbName))(sqlContext).map(x => x._1)
-    val rows = command.run(sqlContext)
-    tablesInDB.foreach{tableName =>
-      DropTableCommand(true, Some(dbName), tableName).run(sqlContext)
-    }
-    CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
-    rows
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
deleted file mode 100644
index 2e45954..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, StartsWith, _}
-import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.apache.carbondata.core.scan.expression.conditional.{GreaterThanEqualToExpression, LessThanExpression, _}
-import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * All filter conversions are done here.
- */
-object CarbonFilters {
-
-  /**
-   * Converts data sources filters to carbon filter predicates.
-   */
-  def createCarbonFilter(schema: StructType,
-      predicate: sources.Filter): Option[CarbonExpression] = {
-    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
-
-    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
-      predicate match {
-
-        case sources.EqualTo(name, value) =>
-          Some(new EqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.Not(sources.EqualTo(name, value)) =>
-          Some(new NotEqualsExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.EqualNullSafe(name, value) =>
-          Some(new EqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.Not(sources.EqualNullSafe(name, value)) =>
-          Some(new NotEqualsExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.GreaterThan(name, value) =>
-          Some(new GreaterThanExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.LessThan(name, value) =>
-          Some(new LessThanExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.GreaterThanOrEqual(name, value) =>
-          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.LessThanOrEqual(name, value) =>
-          Some(new LessThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.In(name, values) =>
-          Some(new InExpression(getCarbonExpression(name),
-            new ListExpression(
-              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-        case sources.Not(sources.In(name, values)) =>
-          Some(new NotInExpression(getCarbonExpression(name),
-            new ListExpression(
-              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-
-        case sources.IsNull(name) =>
-          Some(new EqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, null), true))
-        case sources.IsNotNull(name) =>
-          Some(new NotEqualsExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, null), true))
-
-        case sources.And(lhs, rhs) =>
-          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
-
-        case sources.Or(lhs, rhs) =>
-          for {
-            lhsFilter <- createFilter(lhs)
-            rhsFilter <- createFilter(rhs)
-          } yield {
-            new OrExpression(lhsFilter, rhsFilter)
-          }
-        case sources.StringStartsWith(name, value) if value.length > 0 =>
-          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value))
-          val maxValueLimit = value.substring(0, value.length - 1) +
-                              (value.charAt(value.length - 1).toInt + 1).toChar
-          val r = new LessThanExpression(
-            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
-          Some(new AndExpression(l, r))
-        case _ => None
-      }
-    }
-
-    def getCarbonExpression(name: String) = {
-      new CarbonColumnExpression(name,
-        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
-    }
-
-    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
-      val dataTypeOfAttribute = CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))
-      val dataType = if (Option(value).isDefined
-                         && dataTypeOfAttribute == CarbonDataTypes.STRING
-                         && value.isInstanceOf[Double]) {
-        CarbonDataTypes.DOUBLE
-      } else {
-        dataTypeOfAttribute
-      }
-      new CarbonLiteralExpression(value, dataType)
-    }
-
-    createFilter(predicate)
-  }
-
-
-  // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
-  // Mostly dimension filters are only pushed down since it is faster in carbon.
-  // TODO - The Filters are first converted Intermediate sources filters expression and then these
-  // expressions are again converted back to CarbonExpression. Instead of two step process of
-  // evaluating the filters it can be merged into a single one.
-  def selectFilters(filters: Seq[Expression],
-      attrList: java.util.HashSet[AttributeReferenceWrapper],
-      aliasMap: CarbonAliasDecoderRelation): Unit = {
-    def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
-      expr match {
-        case or@ Or(left, right) =>
-
-          val leftFilter = translate(left, or = true)
-          val rightFilter = translate(right, or = true)
-          if (leftFilter.isDefined && rightFilter.isDefined) {
-            Some( sources.Or(leftFilter.get, rightFilter.get))
-          } else {
-            or.collect {
-              case attr: AttributeReference =>
-                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-            None
-          }
-
-        case And(left, right) =>
-          val leftFilter = translate(left, or)
-          val rightFilter = translate(right, or)
-          if (or) {
-            if (leftFilter.isDefined && rightFilter.isDefined) {
-              (leftFilter ++ rightFilter).reduceOption(sources.And)
-            } else {
-              None
-            }
-          } else {
-            (leftFilter ++ rightFilter).reduceOption(sources.And)
-          }
-
-        case EqualTo(a: Attribute, Literal(v, t)) =>
-          Some(sources.EqualTo(a.name, v))
-        case EqualTo(l@Literal(v, t), a: Attribute) =>
-          Some(sources.EqualTo(a.name, v))
-        case Not(EqualTo(a: Attribute, Literal(v, t))) =>
-          Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), a: Attribute)) =>
-          Some(sources.Not(sources.EqualTo(a.name, v)))
-        case IsNotNull(a: Attribute) =>
-          Some(sources.IsNotNull(a.name))
-        case IsNull(a: Attribute) =>
-          Some(sources.IsNull(a.name))
-        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.Not(sources.In(a.name, hSet.toArray)))
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.In(a.name, hSet.toArray))
-        case GreaterThan(a: Attribute, Literal(v, t)) =>
-          Some(sources.GreaterThan(a.name, v))
-        case GreaterThan(Literal(v, t), a: Attribute) =>
-          Some(sources.LessThan(a.name, v))
-        case LessThan(a: Attribute, Literal(v, t)) =>
-          Some(sources.LessThan(a.name, v))
-        case LessThan(Literal(v, t), a: Attribute) =>
-          Some(sources.GreaterThan(a.name, v))
-        case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-        case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-        case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-        case LessThanOrEqual(Literal(v, t), a: Attribute) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-        case StartsWith(a: Attribute, Literal(v, t)) =>
-          Some(sources.StringStartsWith(a.name, v.toString))
-
-        case others =>
-          if (!or) {
-            others.collect {
-              case attr: AttributeReference =>
-                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          None
-      }
-    }
-    filters.flatMap(translate(_, false)).toArray
-  }
-
-  def isCarbonSupportedDataTypes(expr: Expression): Boolean = {
-    expr.dataType match {
-      case StringType => true
-      case IntegerType => true
-      case LongType => true
-      case DoubleType => true
-      case FloatType => true
-      case BooleanType => true
-      case TimestampType => true
-      case ArrayType(_, _) => true
-      case StructType(_) => true
-      case DecimalType() => true
-      case _ => false
-    }
-  }
-
-  def processExpression(exprs: Seq[Expression],
-      attributesNeedToDecode: java.util.HashSet[AttributeReference],
-      unprocessedExprs: ArrayBuffer[Expression],
-      carbonTable: CarbonTable): Option[CarbonExpression] = {
-    def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
-      expr match {
-        case orFilter@ Or(left, right)
-          if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
-          val leftFilter = transformExpression(left, or = true)
-          val rightFilter = transformExpression(right, or = true)
-          if (leftFilter.isDefined && rightFilter.isDefined) {
-            Some(new OrExpression(leftFilter.get, rightFilter.get))
-          } else {
-            if (!or) {
-              orFilter.collect {
-                case attr: AttributeReference => attributesNeedToDecode.add(attr)
-              }
-              unprocessedExprs += orFilter
-            }
-            None
-          }
-
-        case And(left, right) if (isCarbonSupportedDataTypes(left) &&
-                                  isCarbonSupportedDataTypes(right)) =>
-          val leftFilter = transformExpression(left, or)
-          val rightFilter = transformExpression(right, or)
-          if (or) {
-            if (leftFilter.isDefined && rightFilter.isDefined) {
-              (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _))
-            } else {
-              None
-            }
-          } else {
-            (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _))
-          }
-
-
-        case EqualTo(a: Attribute, l@Literal(v, t)) if (isCarbonSupportedDataTypes(a) &&
-                                                        isCarbonSupportedDataTypes(l)) =>
-          Some(
-            new EqualToExpression(
-              transformExpression(a).get,
-              transformExpression(l).get
-            )
-          )
-        case EqualTo(l@Literal(v, t), a: Attribute) if (isCarbonSupportedDataTypes(l) &&
-                                                        isCarbonSupportedDataTypes(a)) =>
-          Some(
-            new EqualToExpression(
-              transformExpression(a).get,
-              transformExpression(l).get
-            )
-          )
-
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) if (isCarbonSupportedDataTypes(a) &&
-                                                             isCarbonSupportedDataTypes(l)) =>
-          Some(
-            new NotEqualsExpression(
-              transformExpression(a).get,
-              transformExpression(l).get
-            )
-          )
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) if (isCarbonSupportedDataTypes(l) &&
-                                                             isCarbonSupportedDataTypes(a)) =>
-          Some(
-            new NotEqualsExpression(
-              transformExpression(a).get,
-              transformExpression(l).get
-            )
-          )
-        case IsNotNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
-          Some(new NotEqualsExpression(transformExpression(child).get,
-            transformExpression(Literal(null)).get, true))
-        case IsNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
-          Some(new EqualToExpression(transformExpression(child).get,
-            transformExpression(Literal(null)).get, true))
-        case Not(In(a: Attribute, list))
-          if !list.exists(!_.isInstanceOf[Literal]) && isCarbonSupportedDataTypes(a) =>
-          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
-            Some(new FalseExpression(transformExpression(a).get))
-          } else {
-            Some(new NotInExpression(transformExpression(a).get,
-              new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-          }
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) &&
-                                       isCarbonSupportedDataTypes(a) =>
-          Some(new InExpression(transformExpression(a).get,
-            new ListExpression(convertToJavaList(list
-              .map(transformExpression(_).get)))))
-
-        case GreaterThan(a: Attribute, l@Literal(v, t))
-          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case GreaterThan(l@Literal(v, t), a: Attribute)
-          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-
-        case LessThan(a: Attribute, l@Literal(v, t))
-          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case LessThan(l@Literal(v, t), a: Attribute)
-          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-
-        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t))
-          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute)
-          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-
-        case LessThanOrEqual(a: Attribute, l@Literal(v, t))
-          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case LessThanOrEqual(l@Literal(v, t), a: Attribute)
-          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-
-        case AttributeReference(name, dataType, _, _) =>
-          Some(new CarbonColumnExpression(name,
-            CarbonScalaUtil.convertSparkToCarbonDataType(
-              getActualCarbonDataType(name, carbonTable))))
-        case Literal(name, dataType) => Some(new
-            CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
-        case StartsWith(left : Attribute, right@Literal(pattern, dataType)) if
-        pattern.toString.size > 0 &&
-        isCarbonSupportedDataTypes
-        (left) &&
-        isCarbonSupportedDataTypes
-        (right) =>
-          val l = new GreaterThanEqualToExpression(transformExpression(left).get,
-            transformExpression(right).get)
-          val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) +
-                              (pattern.toString.charAt(pattern.toString.length - 1).toInt + 1)
-                                .toChar
-          val r = new LessThanExpression(
-            transformExpression(left).get,
-            new CarbonLiteralExpression(maxValueLimit,
-              CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
-          Some(new AndExpression(l, r))
-        case others =>
-          if (!or) {
-            others.collect {
-              case attr: AttributeReference => attributesNeedToDecode.add(attr)
-            }
-            unprocessedExprs += others
-          }
-          None
-      }
-    }
-    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
-  }
-  private def isNullLiteral(exp: Expression): Boolean = {
-    if (null != exp
-        &&  exp.isInstanceOf[Literal]
-        && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
-        || (exp.asInstanceOf[Literal].value == null)) {
-      true
-    } else {
-      false
-    }
-  }
-  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
-    var carbonColumn: CarbonColumn =
-      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
-    val dataType = if (carbonColumn != null) {
-      carbonColumn.getDataType
-    } else {
-      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
-      carbonColumn.getDataType match {
-        case CarbonDataTypes.INT => CarbonDataTypes.INT
-        case CarbonDataTypes.SHORT => CarbonDataTypes.SHORT
-        case CarbonDataTypes.LONG => CarbonDataTypes.LONG
-        case CarbonDataTypes.DECIMAL => CarbonDataTypes.DECIMAL
-        case _ => CarbonDataTypes.DOUBLE
-      }
-    }
-    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
-  }
-
-  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
-  // not able find the classes inside scala list and gives ClassNotFoundException.
-  private def convertToJavaList(
-      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
-    val javaList = new java.util.ArrayList[CarbonExpression]()
-    scalaList.foreach(javaList.add)
-    javaList
-  }
-}