You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/10/19 10:33:11 UTC

[6/9] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
deleted file mode 100644
index 2e93a6c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.language.implicitConversions
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.{CarbonOption, _}
-
-/**
- * Carbon relation provider compliant to data source api.
- * Creates carbon relations
- */
-class CarbonSource extends RelationProvider
-    with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister {
-
-  override def shortName(): String = "carbondata"
-
-  /**
-   * Returns a new base relation with the given parameters.
-   * Note: the parameters' keywords are case insensitive and this insensitivity is enforced
-   * by the Map that is passed to the function.
-   */
-  override def createRelation(
-      sqlContext: SQLContext,
-      parameters: Map[String, String]): BaseRelation = {
-    // if path is provided we can directly create Hadoop relation. \
-    // Otherwise create datasource relation
-    parameters.get("path") match {
-      case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None)
-      case _ =>
-        val options = new CarbonOption(parameters)
-        val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
-        val identifier = tableIdentifier match {
-          case Seq(name) => TableIdentifier(name, None)
-          case Seq(db, name) => TableIdentifier(name, Some(db))
-        }
-        CarbonDatasourceRelation(identifier, None)(sqlContext)
-    }
-  }
-
-  override def createRelation(
-      sqlContext: SQLContext,
-      mode: SaveMode,
-      parameters: Map[String, String],
-      data: SchemaRDD): BaseRelation = {
-
-    // To avoid derby problem, dataframe need to be writen and read using CarbonContext
-    require(sqlContext.isInstanceOf[CarbonContext], "Error in saving dataframe to carbon file, " +
-        "must use CarbonContext to save dataframe")
-
-    // User should not specify path since only one store is supported in carbon currently,
-    // after we support multi-store, we can remove this limitation
-    require(!parameters.contains("path"), "'path' should not be specified, " +
-        "the path to store carbon file is the 'storePath' specified when creating CarbonContext")
-
-    val options = new CarbonOption(parameters)
-    val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath
-    val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
-    val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        .exists(tablePath)
-    val (doSave, doAppend) = (mode, isExists) match {
-      case (SaveMode.ErrorIfExists, true) =>
-        sys.error(s"ErrorIfExists mode, path $storePath already exists.")
-      case (SaveMode.Overwrite, true) =>
-        val cc = CarbonContext.getInstance(sqlContext.sparkContext)
-        cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
-        (true, false)
-      case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
-        (true, false)
-      case (SaveMode.Append, _) =>
-        (false, true)
-      case (SaveMode.Ignore, exists) =>
-        (!exists, false)
-    }
-
-    if (doSave) {
-      // save data when the save mode is Overwrite.
-      new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters)
-    } else if (doAppend) {
-      new CarbonDataFrameWriter(data).appendToCarbonFile(parameters)
-    }
-
-    createRelation(sqlContext, parameters)
-  }
-
-  override def createRelation(sqlContext: SQLContext,
-      paths: Array[String],
-      dataSchema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema)
-  }
-}
-
-/**
- *  Creates carbon relation compliant to data source api.
- * This relation is stored to hive metastore
- */
-private[sql] case class CarbonDatasourceRelation(
-    tableIdentifier: TableIdentifier,
-    alias: Option[String])
-    (@transient context: SQLContext)
-    extends BaseRelation with Serializable {
-
-  lazy val carbonRelation: CarbonRelation = {
-    CarbonEnv.get
-        .carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext)
-        .asInstanceOf[CarbonRelation]
-  }
-
-  def getDatabaseName(): String = tableIdentifier.database.getOrElse("default")
-
-  def getTable(): String = tableIdentifier.table
-
-  def schema: StructType = carbonRelation.schema
-
-  def sqlContext: SQLContext = context
-
-  override def sizeInBytes: Long = carbonRelation.sizeInBytes
-}
-
-/**
- * Represents logical plan for one carbon table
- */
-case class CarbonRelation(
-    databaseName: String,
-    tableName: String,
-    var metaData: CarbonMetaData,
-    tableMeta: TableMeta,
-    alias: Option[String])(@transient sqlContext: SQLContext)
-    extends LeafNode with MultiInstanceRelation {
-
-  def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
-    childDim.getDataType.getName.toLowerCase match {
-      case "array" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:array<${ getArrayChildren(childDim.getColName) }>"
-      case "struct" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:struct<${ getStructChildren(childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
-    }
-  }
-
-  def getArrayChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
-        case dType => addDecimalScaleAndPrecision(childDim, dType)
-      }
-    }).mkString(",")
-  }
-
-  def getStructChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
-            .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
-        }>"
-        case dType => s"${ childDim.getColName
-            .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
-      }
-    }).mkString(",")
-  }
-
-  override def newInstance(): LogicalPlan = {
-    CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext)
-        .asInstanceOf[this.type]
-  }
-
-  val dimensionsAttr = {
-    val sett = new java.util.LinkedHashSet(tableMeta.carbonTable
-      .getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName).asScala.asJava)
-    sett.asScala.toSeq.filter(dim => !dim.isInvisible ||
-                                     (dim.isInvisible && dim.isInstanceOf[CarbonImplicitDimension]))
-      .map(dim => {
-      val dimval = metaData.carbonTable
-          .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
-      val output: DataType = dimval.getDataType.getName.toLowerCase match {
-        case "array" =>
-          CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
-        case "struct" =>
-          CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
-        case dType =>
-          val dataType = addDecimalScaleAndPrecision(dimval, dType)
-          CarbonMetastoreTypes.toDataType(dataType)
-      }
-
-      AttributeReference(
-        dim.getColName,
-        output,
-        nullable = true)(qualifiers = tableName +: alias.toSeq)
-    })
-  }
-
-  val measureAttr = {
-    val factTable = tableMeta.carbonTable.getFactTableName
-    new java.util.LinkedHashSet(
-      tableMeta.carbonTable.
-          getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-          asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
-        .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
-          metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.getName
-              .toLowerCase match {
-            case "float" => "double"
-            case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
-            case others => others
-          }),
-          nullable = true)(qualifiers = tableName +: alias.toSeq))
-  }
-
-  override val output = {
-    val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
-        .asScala
-    columns.filter(!_.isInvisible).map { column =>
-      if (column.isDimension()) {
-        val output: DataType = column.getDataType.getName.toLowerCase match {
-          case "array" =>
-            CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
-          case "struct" =>
-            CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
-          case dType =>
-            val dataType = addDecimalScaleAndPrecision(column, dType)
-            CarbonMetastoreTypes.toDataType(dataType)
-        }
-        AttributeReference(column.getColName, output,
-          nullable = true
-        )(qualifiers = tableName +: alias.toSeq)
-      } else {
-        AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType(
-          column.getDataType.getName.toLowerCase match {
-            case "float" => "double"
-            case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
-              .getColumnSchema.getScale + ")"
-            case others => others
-          }
-        ),
-          nullable = true
-        )(qualifiers = tableName +: alias.toSeq)
-      }
-    }
-  }
-  // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case p: CarbonRelation =>
-        p.databaseName == databaseName && p.output == output && p.tableName == tableName
-      case _ => false
-    }
-  }
-
-  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
-    var dType = dataType
-    if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) {
-      dType +=
-          "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
-    }
-    dType
-  }
-
-  private var tableStatusLastUpdateTime = 0L
-
-  private var sizeInBytesLocalValue = 0L
-
-  def sizeInBytes: Long = {
-    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
-      tableMeta.carbonTable.getAbsoluteTableIdentifier)
-    if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        tableMeta.storePath,
-        tableMeta.carbonTableIdentifier).getPath
-      val fileType = FileFactory.getFileType(tablePath)
-      if(FileFactory.isFileExist(tablePath, fileType)) {
-        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
-        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
-      }
-    }
-    sizeInBytesLocalValue
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
deleted file mode 100644
index c14a61a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.TaskContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonMetastoreTypes}
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-
-/**
- * It decodes the dictionary key to value
- */
-case class CarbonDictionaryDecoder(
-    relations: Seq[CarbonDecoderRelation],
-    profile: CarbonProfile,
-    aliasMap: CarbonAliasDecoderRelation,
-    child: SparkPlan)
-  (@transient sqlContext: SQLContext)
-  extends UnaryNode {
-
-  override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
-
-  override val output: Seq[Attribute] = {
-    child.output.map { a =>
-      val attr = aliasMap.getOrElse(a, a)
-      val relation = relations.find(p => p.contains(attr))
-      if (relation.isDefined && canBeDecoded(attr)) {
-        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
-        val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
-        if (carbonDimension != null &&
-            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-            !carbonDimension.isComplex()) {
-          val newAttr = AttributeReference(a.name,
-            convertCarbonToSparkDataType(carbonDimension,
-              relation.get.carbonRelation.carbonRelation),
-            a.nullable,
-            a.metadata)(a.exprId,
-            a.qualifiers).asInstanceOf[Attribute]
-          newAttr
-        } else {
-          a
-        }
-      } else {
-        a
-      }
-    }
-  }
-
-
-  def canBeDecoded(attr: Attribute): Boolean = {
-    profile match {
-      case ip: IncludeProfile if ip.attributes.nonEmpty =>
-        ip.attributes
-          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
-      case ep: ExcludeProfile =>
-        !ep.attributes
-          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
-      case _ => true
-    }
-  }
-
-  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
-      relation: CarbonRelation): types.DataType = {
-    carbonDimension.getDataType match {
-      case CarbonDataTypes.STRING => StringType
-      case CarbonDataTypes.SHORT => ShortType
-      case CarbonDataTypes.INT => IntegerType
-      case CarbonDataTypes.LONG => LongType
-      case CarbonDataTypes.DOUBLE => DoubleType
-      case CarbonDataTypes.BOOLEAN => BooleanType
-      case CarbonDataTypes.DECIMAL =>
-        val scale: Int = carbonDimension.getColumnSchema.getScale
-        val precision: Int = carbonDimension.getColumnSchema.getPrecision
-        if (scale == 0 && precision == 0) {
-          DecimalType(18, 2)
-        } else {
-          DecimalType(precision, scale)
-        }
-      case CarbonDataTypes.TIMESTAMP => TimestampType
-      case CarbonDataTypes.DATE => DateType
-      case CarbonDataTypes.STRUCT =>
-        CarbonMetastoreTypes
-          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case CarbonDataTypes.ARRAY =>
-        CarbonMetastoreTypes
-          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
-    }
-  }
-
-  val getDictionaryColumnIds = {
-    val attributes = child.output
-    val dictIds: Array[(String, ColumnIdentifier, DataType, CarbonDimension)] =
-      attributes.map { a =>
-        val attr = aliasMap.getOrElse(a, a)
-        val relation = relations.find(p => p.contains(attr))
-        if (relation.isDefined && canBeDecoded(attr)) {
-          val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
-          val carbonDimension =
-            carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-          if (carbonDimension != null &&
-              carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-              !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-              !carbonDimension.isComplex()) {
-            (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
-              carbonDimension.getDataType, carbonDimension)
-          } else {
-            (null, null, null, null)
-          }
-        } else {
-          (null, null, null, null)
-        }
-
-      }.toArray
-    dictIds
-  }
-
-  override def outputsUnsafeRows: Boolean = true
-
-  override def canProcessUnsafeRows: Boolean = true
-
-  override def canProcessSafeRows: Boolean = true
-
-  override def doExecute(): RDD[InternalRow] = {
-    attachTree(this, "execute") {
-      val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastore].storePath
-      val queryId = sqlContext.getConf("queryId", System.nanoTime() + "")
-      val absoluteTableIdentifiers = relations.map { relation =>
-        val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
-        (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
-      }.toMap
-
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
-      if (isRequiredToDecode) {
-        val dataTypes = child.output.map { attr => attr.dataType }
-        child.execute().mapPartitions { iter =>
-          val cacheProvider: CacheProvider = CacheProvider.getInstance
-          val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-            cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
-          val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
-            forwardDictionaryCache)
-          val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
-          // add a task completion listener to clear dictionary that is a decisive factor for
-          // LRU eviction policy
-          val dictionaryTaskCleaner = TaskContext.get
-          dictionaryTaskCleaner.addTaskCompletionListener(context =>
-            dicts.foreach { dictionary =>
-              if (null != dictionary) {
-                dictionary.clear()
-              }
-            }
-          )
-          new Iterator[InternalRow] {
-            val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-
-            override final def hasNext: Boolean = {
-              iter.hasNext
-            }
-
-            override final def next(): InternalRow = {
-              val row: InternalRow = iter.next()
-              val data = row.toSeq(dataTypes).toArray
-              dictIndex.foreach { index =>
-                if (data(index) != null) {
-                  data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                    .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
-                    getDictionaryColumnIds(index)._4)
-                }
-              }
-              val result = unsafeProjection(new GenericMutableRow(data))
-              result
-            }
-          }
-        }
-      } else {
-        child.execute()
-      }
-    }
-  }
-
-  private def isRequiredToDecode = {
-    getDictionaryColumnIds.find(p => p._1 != null) match {
-      case Some(value) => true
-      case _ => false
-    }
-  }
-
-  private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
-      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
-    val dictionaryColumnIds = getDictionaryColumnIds.map { dictionaryId =>
-      if (dictionaryId._2 != null) {
-        new DictionaryColumnUniqueIdentifier(
-          atiMap(dictionaryId._1).getCarbonTableIdentifier,
-          dictionaryId._2, dictionaryId._3,
-          CarbonStorePath.getCarbonTablePath(atiMap(dictionaryId._1)))
-      } else {
-        null
-      }
-    }
-    try {
-      val noDictionaryIndexes = new java.util.ArrayList[Int]()
-      dictionaryColumnIds.zipWithIndex.foreach { columnIndex =>
-        if (columnIndex._1 == null) {
-          noDictionaryIndexes.add(columnIndex._2)
-        }
-      }
-      val dict = cache.getAll(dictionaryColumnIds.filter(_ != null).toSeq.asJava);
-      val finalDict = new java.util.ArrayList[Dictionary]()
-      var dictIndex: Int = 0
-      dictionaryColumnIds.zipWithIndex.foreach { columnIndex =>
-        if (!noDictionaryIndexes.contains(columnIndex._2)) {
-          finalDict.add(dict.get(dictIndex))
-          dictIndex += 1
-        } else {
-          finalDict.add(null)
-        }
-      }
-      finalDict.asScala
-    } catch {
-      case t: Throwable => Seq.empty
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
deleted file mode 100644
index 36cd6f2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, CarbonMetastore}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
-import org.apache.carbondata.spark.rdd.SparkReadSupport
-
-case class CarbonEnv(carbonMetastore: CarbonMetastore)
-
-object CarbonEnv {
-
-  @volatile private var carbonEnv: CarbonEnv = _
-
-  // set readsupport class global so that the executor can get it.
-  SparkReadSupport.readSupportClass = classOf[RawDataReadSupport]
-
-  var initialized = false
-
-  def init(sqlContext: SQLContext): Unit = {
-    if (!initialized) {
-      val cc = sqlContext.asInstanceOf[CarbonContext]
-      val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
-      carbonEnv = CarbonEnv(catalog)
-      CarbonIUDAnalysisRule.init(sqlContext)
-      initialized = true
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
-    }
-  }
-
-  def get: CarbonEnv = {
-    if (initialized) carbonEnv
-    else throw new RuntimeException("CarbonEnv not initialized")
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
deleted file mode 100644
index 6ed8c0d..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.SQLConf.SQLConfEntry
-import org.apache.spark.sql.hive.CarbonSQLDialect
-
- /**
-  * A trait that enables the setting and getting of mutable config parameters/hints.
-  *
-  */
-class CarbonSQLConf extends SQLConf {
-
-  override def dialect: String = {
-    getConf(SQLConf.DIALECT,
-      classOf[CarbonSQLDialect].getCanonicalName)
-  }
-
-  override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
deleted file mode 100644
index a3c6343..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastore
-
-import org.apache.carbondata.core.scan.model._
-import org.apache.carbondata.hadoop.{CarbonProjection, InputMetricsStats}
-import org.apache.carbondata.spark.CarbonFilters
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
-    var columnProjection: Seq[Attribute],
-    relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression],
-    useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
-  val carbonTable = relationRaw.metaData.carbonTable
-  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
-  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastore]
-
-  val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
-  val unprocessedExprs = new ArrayBuffer[Expression]()
-
-  val buildCarbonPlan: CarbonQueryPlan = {
-    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
-    plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-    processFilterExpressions(plan)
-    plan
-  }
-
-  def processFilterExpressions(plan: CarbonQueryPlan) {
-    if (dimensionPredicatesRaw.nonEmpty) {
-      val expressionVal = CarbonFilters.processExpression(
-        dimensionPredicatesRaw,
-        attributesNeedToDecode,
-        unprocessedExprs,
-        carbonTable)
-      expressionVal match {
-        case Some(ce) =>
-          // adding dimension used in expression in querystats
-          plan.setFilterExpression(ce)
-        case _ =>
-      }
-    }
-    processExtraAttributes(plan)
-  }
-
-  private def processExtraAttributes(plan: CarbonQueryPlan) {
-    if (attributesNeedToDecode.size() > 0) {
-      val attributeOut = new ArrayBuffer[Attribute]() ++ columnProjection
-
-      attributesNeedToDecode.asScala.foreach { attr =>
-        if (!columnProjection.exists(_.name.equalsIgnoreCase(attr.name))) {
-          attributeOut += attr
-        }
-      }
-      columnProjection = attributeOut
-    }
-
-    val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
-    columns.addAll(carbonTable.getImplicitDimensionByTableName(carbonTable.getFactTableName))
-    val colAttr = new Array[Attribute](columns.size())
-    columnProjection.foreach { attr =>
-    val column =
-        carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
-      if(column != null) {
-        colAttr(columns.indexOf(column)) = attr
-       }
-    }
-
-    columnProjection = colAttr.filter(f => f != null)
-
-    var queryOrder: Integer = 0
-    columnProjection.foreach { attr =>
-      val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
-      if (carbonColumn != null) {
-        if (carbonColumn.isDimension()) {
-          val dim = new QueryDimension(attr.name)
-          dim.setQueryOrder(queryOrder)
-          queryOrder = queryOrder + 1
-          selectedDims += dim
-         } else {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setQueryOrder(queryOrder)
-          queryOrder = queryOrder + 1
-          selectedMsrs += m1
-        }
-      }
-    }
-
-    // Fill the selected dimensions & measures obtained from
-    // attributes to query plan  for detailed query
-    selectedDims.foreach(plan.addDimension)
-    selectedMsrs.foreach(plan.addMeasure)
-  }
-
-  def inputRdd: CarbonScanRDD = {
-    val projection = new CarbonProjection
-    columnProjection.foreach { attr =>
-      projection.addColumn(attr.name)
-    }
-    val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
-    new CarbonScanRDD(
-      ocRaw.sparkContext,
-      projection,
-      buildCarbonPlan.getFilterExpression,
-      carbonTable.getAbsoluteTableIdentifier,
-      carbonTable.getTableInfo.serialize(),
-      carbonTable.getTableInfo, inputMetricsStats
-    )
-  }
-
-  override def outputsUnsafeRows: Boolean =
-    (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
-  override def doExecute(): RDD[InternalRow] = {
-    val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-    inputRdd.mapPartitions { iter =>
-      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-      new Iterator[InternalRow] {
-        override def hasNext: Boolean = iter.hasNext
-
-        override def next(): InternalRow = {
-          val value = iter.next
-          if (outUnsafeRows) {
-            unsafeProjection(value)
-          } else {
-            value
-          }
-        }
-      }
-    }
-  }
-
-  def output: Seq[Attribute] = {
-    columnProjection
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
deleted file mode 100644
index bc62a55..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-
-case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
-object CarbonSparkUtil {
-
-  def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
-    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-        .asScala.map(x => x.getColName)
-    // wf : may be problem
-    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-        .asScala.map(x => x.getColName)
-    val dictionary =
-      carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
-        (f.getColName.toLowerCase,
-            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !f.getDataType.isComplexType)
-      }
-    CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
deleted file mode 100644
index 9dc9ee2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.Map
-import scala.language.implicitConversions
-
-import org.apache.hadoop.hive.ql.lib.Node
-import org.apache.hadoop.hive.ql.parse._
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.DescribeCommand
-import org.apache.spark.sql.hive.HiveQlWrapper
-import org.apache.spark.sql.types.StructField
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Parser for All Carbon DDL, DML cases in Unified context
- */
-class CarbonSqlParser() extends CarbonDDLSqlParser {
-
-  override def parse(input: String): LogicalPlan = {
-    synchronized {
-      // Initialize the Keywords.
-      initLexical
-      phrase(start)(new lexical.Scanner(input)) match {
-        case Success(plan, _) => plan match {
-          case x: LoadTable =>
-            x.inputSqlString = input
-            x
-          case logicalPlan => logicalPlan
-        }
-        case failureOrError => sys.error(failureOrError.toString)
-      }
-    }
-  }
-
-  override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
-
-  protected lazy val startCommand: Parser[LogicalPlan] =
-    createDatabase | dropDatabase | loadManagement | describeTable |
-      showPartitions | showLoads | alterTable | updateTable | deleteRecords | useDatabase |
-      createTable
-
-  protected lazy val loadManagement: Parser[LogicalPlan] =
-    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
-
-  protected lazy val createDatabase: Parser[LogicalPlan] =
-    CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
-      case statement =>
-        val createDbSql = "CREATE DATABASE " + statement
-        var dbName = ""
-        // Get Ast node for create db command
-        val node = HiveQlWrapper.getAst(createDbSql)
-        node match {
-          // get dbname
-          case Token("TOK_CREATEDATABASE", children) =>
-            dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText)
-        }
-        CreateDatabase(convertDbNameToLowerCase(dbName), createDbSql)
-    }
-
-  protected lazy val dropDatabase: Parser[LogicalPlan] =
-    DROP ~> (DATABASE | SCHEMA) ~> restInput ^^ {
-      case statement =>
-        val dropDbSql = "DROP DATABASE " + statement
-        var dbName = ""
-        var isCascade = false
-        // Get Ast node for drop db command
-        val node = HiveQlWrapper.getAst(dropDbSql)
-        node match {
-          case Token("TOK_DROPDATABASE", children) =>
-            dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText)
-            // check whether cascade drop db
-            children.collect {
-              case t@Token("TOK_CASCADE", _) =>
-                isCascade = true
-              case _ => // Unsupport features
-            }
-        }
-        DropDatabase(convertDbNameToLowerCase(dbName), isCascade, dropDbSql)
-    }
-
-  protected lazy val alterTable: Parser[LogicalPlan] =
-    ALTER ~> TABLE ~> restInput ^^ {
-      case statement =>
-        try {
-          val alterSql = "alter table " + statement
-          // DDl will be parsed and we get the AST tree from the HiveQl
-          val node = HiveQlWrapper.getAst(alterSql)
-          // processing the AST tree
-          nodeToPlanForAlterTable(node, alterSql)
-        } catch {
-          // MalformedCarbonCommandException need to be throw directly, parser will catch it
-          case ce: MalformedCarbonCommandException =>
-            throw ce
-        }
-    }
-
-  /**
-   * For handling the create table DDl systax compatible to Hive syntax
-   */
-  protected lazy val createTable: Parser[LogicalPlan] =
-    restInput ^^ {
-
-      case statement =>
-        try {
-          // DDl will be parsed and we get the AST tree from the HiveQl
-          val node = HiveQlWrapper.getAst(statement)
-          // processing the AST tree
-          nodeToPlan(node)
-        } catch {
-          // MalformedCarbonCommandException need to be throw directly, parser will catch it
-          case ce: MalformedCarbonCommandException =>
-            throw ce
-          case e: Exception =>
-            sys.error("Parsing error") // no need to do anything.
-        }
-    }
-
-  /**
-   * This function will traverse the tree and logical plan will be formed using that.
-   *
-   * @param node
-   * @return LogicalPlan
-   */
-  protected def nodeToPlan(node: Node): LogicalPlan = {
-    node match {
-      // if create table taken is found then only we will handle.
-      case Token("TOK_CREATETABLE", children) =>
-
-
-        var fields: Seq[Field] = Seq[Field]()
-        var tableComment: String = ""
-        var tableProperties = Map[String, String]()
-        var partitionByFields: Seq[Field] = Seq[Field]()
-        var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
-        var likeTableName: String = ""
-        var storedBy: String = ""
-        var ifNotExistPresent: Boolean = false
-        var dbName: Option[String] = None
-        var tableName: String = ""
-        var bucketFields: Option[BucketFields] = None
-
-        try {
-
-          // Checking whether create table request is carbon table
-          children.collect {
-            case Token("TOK_STORAGEHANDLER", child :: Nil) =>
-              storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText).trim.toLowerCase
-            case _ =>
-          }
-          if (!(storedBy.equals(CarbonContext.datasourceName) ||
-                storedBy.equals(CarbonContext.datasourceShortName))) {
-            sys.error("Not a carbon format request")
-          }
-
-          children.collect {
-            // collecting all the field  list
-            case list@Token("TOK_TABCOLLIST", _) =>
-              val cols = BaseSemanticAnalyzer.getColumns(list, true)
-              if (cols != null) {
-                val dupColsGrp = cols.asScala.groupBy(x => x.getName) filter {
-                  case (_, colList) => colList.size > 1
-                }
-                if (dupColsGrp.nonEmpty) {
-                  var columnName: String = ""
-                  dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
-                  columnName = columnName.substring(0, columnName.lastIndexOf(", "))
-                  val errorMessage = "Duplicate column name: " + columnName + " found in table " +
-                                     ".Please check create table statement."
-                  throw new MalformedCarbonCommandException(errorMessage)
-                }
-                cols.asScala.map { col =>
-                  val columnName = col.getName()
-                  val dataType = Option(col.getType)
-                  val name = Option(col.getName())
-                  // This is to parse complex data types
-                  val x = '`' + col.getName + '`' + ' ' + col.getType
-                  val f: Field = anyFieldDef(new lexical.Scanner(x))
-                  match {
-                    case Success(field, _) => field
-                    case failureOrError => throw new MalformedCarbonCommandException(
-                      s"Unsupported data type: $col.getType")
-                  }
-                  // the data type of the decimal type will be like decimal(10,0)
-                  // so checking the start of the string and taking the precision and scale.
-                  // resetting the data type with decimal
-                  if (f.dataType.getOrElse("").startsWith("decimal")) {
-                    val (precision, scale) = getScaleAndPrecision(col.getType)
-                    f.precision = precision
-                    f.scale = scale
-                    f.dataType = Some("decimal")
-                  }
-                  if (f.dataType.getOrElse("").startsWith("char")) {
-                    f.dataType = Some("char")
-                  } else if (f.dataType.getOrElse("").startsWith("float")) {
-                    f.dataType = Some("float")
-                  }
-                  f.rawSchema = x
-                  fields ++= Seq(f)
-                }
-              }
-
-            case Token("TOK_IFNOTEXISTS", _) =>
-              ifNotExistPresent = true
-
-            case t@Token("TOK_TABNAME", _) =>
-              val (db, tblName) = extractDbNameTableName(t)
-              dbName = db
-              tableName = tblName.toLowerCase()
-
-            case Token("TOK_TABLECOMMENT", child :: Nil) =>
-              tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
-            case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
-              val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
-              if (cols != null) {
-                cols.asScala.map { col =>
-                  val columnName = col.getName()
-                  val dataType = Option(col.getType)
-                  val comment = col.getComment
-                  val rawSchema = '`' + col.getName + '`' + ' ' + col.getType
-                  val field = Field(columnName, dataType, Some(columnName), None)
-
-                  // the data type of the decimal type will be like decimal(10,0)
-                  // so checking the start of the string and taking the precision and scale.
-                  // resetting the data type with decimal
-                  if (field.dataType.getOrElse("").startsWith("decimal")) {
-                    val (precision, scale) = getScaleAndPrecision(col.getType)
-                    field.precision = precision
-                    field.scale = scale
-                    field.dataType = Some("decimal")
-                  }
-                  if (field.dataType.getOrElse("").startsWith("char")) {
-                    field.dataType = Some("char")
-                  } else if (field.dataType.getOrElse("").startsWith("float")) {
-                    field.dataType = Some("float")
-                  }
-                  field.rawSchema = rawSchema
-                  val partitionCol = new PartitionerField(columnName, dataType, comment)
-                  partitionCols ++= Seq(partitionCol)
-                  partitionByFields ++= Seq(field)
-                }
-              }
-            case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
-              val propertySeq: Seq[(String, String)] = getProperties(list)
-              val repeatedProperties = propertySeq.groupBy(_._1).filter(_._2.size > 1).keySet
-              if (repeatedProperties.nonEmpty) {
-                val repeatedPropStr: String = repeatedProperties.mkString(",")
-                throw new MalformedCarbonCommandException("Table properties is repeated: " +
-                                                          repeatedPropStr)
-              }
-              tableProperties ++= propertySeq
-
-            case Token("TOK_LIKETABLE", child :: Nil) =>
-              likeTableName = child.getChild(0).getText()
-            case Token("TOK_ALTERTABLE_BUCKETS",
-            Token("TOK_TABCOLNAME", list) :: numberOfBuckets) =>
-              val cols = list.map(_.getText)
-              if (cols != null) {
-                bucketFields = Some(BucketFields(cols,
-                  numberOfBuckets.head.getText.toInt))
-              }
-
-            case _ => // Unsupport features
-          }
-
-          // validate tblProperties
-          if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
-            throw new MalformedCarbonCommandException("Invalid table properties")
-          }
-
-          if (partitionCols.nonEmpty) {
-            if (!CommonUtil.validatePartitionColumns(tableProperties, partitionCols)) {
-              throw new MalformedCarbonCommandException("Invalid partition definition")
-            }
-            // partition columns should not be part of the schema
-            val colNames = fields.map(_.column)
-            val badPartCols = partitionCols.map(_.partitionColumn).toSet.intersect(colNames.toSet)
-            if (badPartCols.nonEmpty) {
-              throw new MalformedCarbonCommandException(
-                "Partition columns should not be specified in the schema: " +
-                badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"))
-            }
-            fields ++= partitionByFields
-          }
-
-          // prepare table model of the collected tokens
-          val tableModel: TableModel = prepareTableModel(ifNotExistPresent,
-            dbName,
-            tableName,
-            fields,
-            partitionCols,
-            tableProperties,
-            bucketFields)
-
-          // get logical plan.
-          CreateTable(tableModel)
-        } catch {
-          case ce: MalformedCarbonCommandException =>
-            val message = if (tableName.isEmpty) {
-              "Create table command failed. "
-            }
-            else if (dbName.isEmpty) {
-              s"Create table command failed for $tableName. "
-            }
-            else {
-              s"Create table command failed for ${ dbName.get }.$tableName. "
-            }
-            LOGGER.audit(message + ce.getMessage)
-            throw ce
-        }
-
-    }
-  }
-
-  /**
-   * This function will traverse the tree and logical plan will be formed using that.
-   *
-   * @param node
-   * @return LogicalPlan
-   */
-  protected def nodeToPlanForAlterTable(node: Node, alterSql: String): LogicalPlan = {
-    node match {
-      // if create table taken is found then only we will handle.
-      case Token("TOK_ALTERTABLE", children) =>
-
-        var dbName: Option[String] = None
-        var tableName: String = ""
-        var compactionType: String = ""
-
-        children.collect {
-
-          case t@Token("TOK_TABNAME", _) =>
-            val (db, tblName) = extractDbNameTableName(t)
-            dbName = db
-            tableName = tblName
-
-          case Token("TOK_ALTERTABLE_COMPACT", child :: Nil) =>
-            compactionType = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
-          case _ => // Unsupport features
-        }
-
-        val altertablemodel = AlterTableModel(dbName,
-          tableName,
-          None,
-          compactionType,
-          Some(System.currentTimeMillis()),
-          alterSql)
-        AlterTableCompaction(altertablemodel)
-    }
-  }
-
-  protected lazy val loadDataNew: Parser[LogicalPlan] =
-    LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
-    (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
-    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
-      case filePath ~ isOverwrite ~ table ~ optionsList =>
-        val (databaseNameOp, tableName) = table match {
-          case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
-        }
-        if (optionsList.isDefined) {
-          validateOptions(optionsList)
-        }
-        val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
-        LoadTable(convertDbNameToLowerCase(databaseNameOp), tableName, filePath, Seq(), optionsMap,
-          isOverwrite.isDefined)
-    }
-
-  protected lazy val describeTable: Parser[LogicalPlan] =
-    ((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
-      case ef ~ db ~ tbl =>
-        val tblIdentifier = db match {
-          case Some(dbName) =>
-            TableIdentifier(tbl.toLowerCase, Some(convertDbNameToLowerCase(dbName)))
-          case None =>
-            TableIdentifier(tbl.toLowerCase, None)
-        }
-        if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) {
-          new DescribeFormattedCommand("describe formatted " + tblIdentifier,
-            tblIdentifier)
-        } else {
-          new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined)
-        }
-    }
-
-  protected lazy val showLoads: Parser[LogicalPlan] =
-    SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
-    (LIMIT ~> numericLit).? <~
-    opt(";") ^^ {
-      case databaseName ~ tableName ~ limit =>
-        ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit)
-    }
-
-  protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
-  DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
-  (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ {
-    case dbName ~ tableName ~ loadids =>
-      DeleteLoadsById(loadids, convertDbNameToLowerCase(dbName), tableName.toLowerCase())
-  }
-
-  protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
-    DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
-    (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
-    opt(";") ^^ {
-      case database ~ table ~ condition =>
-        condition match {
-          case dateField ~ dateValue =>
-            DeleteLoadsByLoadDate(convertDbNameToLowerCase(database),
-              table.toLowerCase(),
-              dateField,
-              dateValue)
-        }
-    }
-
-  protected lazy val cleanFiles: Parser[LogicalPlan] =
-    CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
-      case databaseName ~ tableName =>
-        CleanFiles(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
-    }
-
-  protected lazy val explainPlan: Parser[LogicalPlan] =
-    (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
-      case isExtended ~ logicalPlan =>
-        logicalPlan match {
-          case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
-          case _ => ExplainCommand(OneRowRelation)
-        }
-    }
-
-  protected lazy val deleteRecords: Parser[LogicalPlan] =
-    (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
-      case table ~ rest =>
-        val tableName = getTableName(table.tableIdentifier)
-        val alias = table.alias.getOrElse("")
-        DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table)
-    }
-
-  protected lazy val updateTable: Parser[LogicalPlan] =
-    UPDATE ~> table ~
-    (SET ~> "(" ~> repsep(element, ",") <~ ")") ~
-    ("=" ~> restInput) <~ opt(";") ^^ {
-      case tab ~ columns ~ rest =>
-        val (sel, where) = splitQuery(rest)
-        val (selectStmt, relation) =
-          if (!sel.toLowerCase.startsWith("select ")) {
-            if (sel.trim.isEmpty) {
-              sys.error("At least one source column has to be specified ")
-            }
-            // only list of expression are given, need to convert that list of expressions into
-            // select statement on destination table
-            val relation = tab match {
-              case r@UnresolvedRelation(tableIdentifier, alias) =>
-                updateRelation(r, tableIdentifier, alias)
-              case _ => tab
-            }
-            ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " +
-             relation.alias.get, relation)
-          } else {
-            (sel, updateRelation(tab, tab.tableIdentifier, tab.alias))
-          }
-        UpdateTable(relation, columns, selectStmt, where)
-    }
-  protected lazy val showPartitions: Parser[LogicalPlan] =
-    (SHOW ~> PARTITIONS ~> table) <~ opt(";") ^^ {
-      case table =>
-        val tableName = getTableName(table.tableIdentifier)
-        val alias = table.alias.getOrElse("")
-        ShowPartitions(table.tableIdentifier)
-    }
-
-  private def splitQuery(query: String): (String, String) = {
-    val stack = scala.collection.mutable.Stack[Char]()
-    var foundSingleQuotes = false
-    var foundDoubleQuotes = false
-    var foundEscapeChar = false
-    var ignoreChar = false
-    var stop = false
-    var bracketCount = 0
-    val (selectStatement, where) = query.span {
-      ch => {
-        if (stop) {
-          false
-        } else {
-          ignoreChar = false
-          if (foundEscapeChar && (ch == '\'' || ch == '\"' || ch == '\\')) {
-            foundEscapeChar = false
-            ignoreChar = true
-          }
-          // If escaped single or double quotes found, no need to consider
-          if (!ignoreChar) {
-            if (ch == '\\') {
-              foundEscapeChar = true
-            } else if (ch == '\'') {
-              foundSingleQuotes = !foundSingleQuotes
-            } else if (ch == '\"') {
-              foundDoubleQuotes = !foundDoubleQuotes
-            }
-            else if (ch == '(' && !foundSingleQuotes && !foundDoubleQuotes) {
-              bracketCount = bracketCount + 1
-              stack.push(ch)
-            } else if (ch == ')' && !foundSingleQuotes && !foundDoubleQuotes) {
-              bracketCount = bracketCount + 1
-              stack.pop()
-              if (0 == stack.size) {
-                stop = true
-              }
-            }
-          }
-          true
-        }
-      }
-    }
-    if (bracketCount == 0 || bracketCount % 2 != 0) {
-      sys.error("Parsing error, missing bracket ")
-    }
-    val select = selectStatement.trim
-    (select.substring(1, select.length - 1).trim -> where.trim)
-  }
-
-
-  protected lazy val table: Parser[UnresolvedRelation] = {
-    rep1sep(attributeName, ".") ~ opt(ident) ^^ {
-      case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
-    }
-  }
-
-  protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", {
-    case lexical.Identifier(str) => str.toLowerCase
-    case lexical.Keyword(str) if !lexical.delimiters.contains(str) => str.toLowerCase
-  })
-
-  private def updateRelation(
-      r: UnresolvedRelation,
-      tableIdentifier: Seq[String],
-      alias: Option[String]): UnresolvedRelation = {
-    alias match {
-      case Some(_) => r
-      case _ =>
-        val tableAlias = tableIdentifier match {
-          case Seq(dbName, tableName) => Some(tableName)
-          case Seq(tableName) => Some(tableName)
-        }
-        UnresolvedRelation(tableIdentifier, tableAlias)
-    }
-  }
-
-  private def getTableName(tableIdentifier: Seq[String]): String = {
-    if (tableIdentifier.size > 1) {
-      tableIdentifier(0) + "." + tableIdentifier(1)
-    } else {
-      tableIdentifier(0)
-    }
-  }
-
-  protected lazy val element: Parser[String] =
-    (ident <~ ".").? ~ ident ^^ {
-      case table ~ column => column.toLowerCase
-    }
-
-  protected lazy val useDatabase: Parser[LogicalPlan] =
-    USE ~> ident <~ opt(";") ^^ {
-      case databaseName => UseDatabase(s"use ${ databaseName.toLowerCase }")
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
deleted file mode 100644
index 065dfed..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
-import org.apache.spark.util.{ScalaCompilerUtil, Utils}
-
-private[sql] class CodeGenerateFactory(version: String) {
-
-  val optimizerFactory = if (version.equals("1.6.2") || version.equals("1.6.3")) {
-    ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_6_OptimizerString)
-      .asInstanceOf[AbstractCarbonOptimizerFactory]
-  } else if (version.startsWith("1.6") || version.startsWith("1.5")) {
-    ScalaCompilerUtil.compiledCode(CodeTemplates.defaultOptimizerString)
-      .asInstanceOf[AbstractCarbonOptimizerFactory]
-  } else {
-    throw new UnsupportedOperationException(s"Spark version $version is not supported")
-  }
-
-  val expandFactory = if (version.startsWith("1.5")) {
-    ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_5ExpandString)
-      .asInstanceOf[AbstractCarbonExpandFactory]
-  } else if (version.startsWith("1.6")) {
-    new AbstractCarbonExpandFactory {
-      override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
-        val loader = Utils.getContextOrSparkClassLoader
-        try {
-          val cons = loader.loadClass("org.apache.spark.sql.catalyst.plans.logical.Expand")
-            .getDeclaredConstructors
-          cons.head.setAccessible(true)
-          cons.head.newInstance(expand.projections, expand.output, child).asInstanceOf[Expand]
-        } catch {
-          case e: Exception => null
-        }
-      }
-    }
-  } else {
-    throw new UnsupportedOperationException(s"Spark version $version is not supported")
-  }
-
-}
-
-object CodeGenerateFactory {
-
-  private var codeGenerateFactory: CodeGenerateFactory = _
-
-  def init(version: String): Unit = {
-    if (codeGenerateFactory == null) {
-      codeGenerateFactory = new CodeGenerateFactory(version)
-    }
-  }
-
-  def getInstance(): CodeGenerateFactory = {
-    codeGenerateFactory
-  }
-
-  def createDefaultOptimizer(conf: CatalystConf, sc: SparkContext): Optimizer = {
-    val name = "org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer"
-    val loader = Utils.getContextOrSparkClassLoader
-    try {
-      val cons = loader.loadClass(name + "$").getDeclaredConstructors
-      cons.head.setAccessible(true)
-      cons.head.newInstance().asInstanceOf[Optimizer]
-    } catch {
-      case e: Exception =>
-        loader.loadClass(name).getConstructor(classOf[CatalystConf])
-          .newInstance(conf).asInstanceOf[Optimizer]
-    }
-  }
-
-}
-
-object CodeTemplates {
-
-  val spark1_6_OptimizerString =
-    s"""
-       import org.apache.spark.sql._;
-       import org.apache.spark.sql.optimizer._;
-       import org.apache.spark.sql.catalyst.plans.logical._;
-       import org.apache.spark.sql.catalyst._;
-       import org.apache.spark.sql.catalyst.optimizer.Optimizer;
-
-       new AbstractCarbonOptimizerFactory {
-         override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
-           class CarbonOptimizer1(optimizer: Optimizer, conf: CarbonSQLConf)
-             extends Optimizer(conf) {
-             override val batches = Nil;
-             override def execute(plan: LogicalPlan): LogicalPlan = {
-               CarbonOptimizer.execute(plan, optimizer);
-             }
-           }
-           new CarbonOptimizer1(optimizer, conf);
-         }
-       }
-    """
-
-  val defaultOptimizerString =
-    s"""
-       import org.apache.spark.sql._;
-       import org.apache.spark.sql.optimizer._;
-       import org.apache.spark.sql.catalyst.plans.logical._;
-       import org.apache.spark.sql.catalyst._;
-       import org.apache.spark.sql.catalyst.optimizer.Optimizer;
-
-       new AbstractCarbonOptimizerFactory {
-         override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
-           class CarbonOptimizer2(optimizer: Optimizer, conf: CarbonSQLConf) extends Optimizer {
-             val batches = Nil;
-             override def execute(plan: LogicalPlan): LogicalPlan = {
-               CarbonOptimizer.execute(plan, optimizer);
-             }
-           }
-           new CarbonOptimizer2(optimizer, conf);
-         }
-       }
-    """
-
-  val spark1_5ExpandString =
-    s"""
-       import org.apache.spark.sql._
-       import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
-       new AbstractCarbonExpandFactory {
-         override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
-           Expand(expand.bitmasks, expand.groupByExprs, expand.gid, child)
-         }
-       }
-    """
-}
-
-abstract class AbstractCarbonOptimizerFactory {
-  def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf) : Optimizer
-}
-
-abstract class AbstractCarbonExpandFactory {
-  def createExpand(expand: Expand, child: LogicalPlan) : Expand
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index d745be2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property
- *
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
-  override def nullable: Boolean = true
-
-  override def eval(input: InternalRow): Any = null
-
-  override protected def genCode(ctx: CodeGenContext,
-      ev: GeneratedExpressionCode): String = ev.code
-  override def deterministic: Boolean = true
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = Seq()
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
deleted file mode 100644
index dc2dd7b..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
-
-import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, ExpressionResult, UnknownExpression}
-import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException
-import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-class SparkUnknownExpression(var sparkExp: SparkExpression)
-  extends UnknownExpression with ConditionalExpression {
-
-  private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
-  private var isExecutor: Boolean = false
-  children.addAll(getColumnList())
-
-  override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
-
-    val values = carbonRowInstance.getValues.toSeq.map {
-      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
-      case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d)
-      case value => value
-    }
-    try {
-      val result = evaluateExpression(
-        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
-      val sparkRes = if (isExecutor) {
-        result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
-      } else {
-        result
-      }
-      new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
-        sparkRes
-      )
-    } catch {
-      case e: Exception => throw new FilterUnsupportedException(e.getMessage)
-    }
-  }
-
-  override def getFilterExpressionType: ExpressionType = {
-    ExpressionType.UNKNOWN
-  }
-
-  override def getString: String = {
-    sparkExp.toString()
-  }
-
-  def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
-    this.evaluateExpression = evaluateExpression
-    isExecutor = true
-  }
-
-  override def findAndSetChild(oldExpr: Expression, newExpr: Expression): Unit = {}
-
-  def getColumnList: java.util.List[ColumnExpression] = {
-
-    val lst = new java.util.ArrayList[ColumnExpression]()
-    getColumnListFromExpressionTree(sparkExp, lst)
-    lst
-  }
-  def getLiterals: java.util.List[ExpressionResult] = {
-
-    val lst = new java.util.ArrayList[ExpressionResult]()
-    lst
-  }
-
-  def getAllColumnList: java.util.List[ColumnExpression] = {
-    val lst = new java.util.ArrayList[ColumnExpression]()
-    getAllColumnListFromExpressionTree(sparkExp, lst)
-    lst
-  }
-
-  def isSingleColumn: Boolean = {
-    val lst = new java.util.ArrayList[ColumnExpression]()
-    getAllColumnListFromExpressionTree(sparkExp, lst)
-    if (lst.size == 1 && lst.get(0).isDimension) {
-      true
-    } else {
-      false
-    }
-  }
-
-  def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-      list: java.util.List[ColumnExpression]): Unit = {
-    sparkCurrentExp match {
-      case carbonBoundRef: CarbonBoundReference =>
-        val foundExp = list.asScala
-          .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName)
-        if (foundExp.isEmpty) {
-          carbonBoundRef.colExp.setColIndex(list.size)
-          list.add(carbonBoundRef.colExp)
-        } else {
-          carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex)
-        }
-      case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
-    }
-  }
-
-
-  def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-      list: java.util.List[ColumnExpression]): java.util.List[ColumnExpression] = {
-    sparkCurrentExp match {
-      case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
-      case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
-    }
-    list
-  }
-
-}