You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/10/19 10:33:08 UTC

[3/9] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
deleted file mode 100644
index 914203f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ /dev/null
@@ -1,862 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.optimizer
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ProjectForUpdateCommand
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.stats.QueryStatistic
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.spark.{CarbonAliasDecoderRelation, CarbonFilters}
-
-/**
- * Carbon Optimizer to add dictionary decoder.
- */
-object CarbonOptimizer {
-
-  def optimizer(optimizer: Optimizer, conf: CarbonSQLConf, version: String): Optimizer = {
-    CodeGenerateFactory.getInstance().optimizerFactory.createOptimizer(optimizer, conf)
-  }
-
-  def execute(plan: LogicalPlan, optimizer: Optimizer): LogicalPlan = {
-    val executedPlan: LogicalPlan = optimizer.execute(plan)
-    val relations = CarbonOptimizer.collectCarbonRelation(plan)
-    if (relations.nonEmpty) {
-      new ResolveCarbonFunctions(relations).apply(executedPlan)
-    } else {
-      executedPlan
-    }
-  }
-
-  // get the carbon relation from plan.
-  def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
-    plan collect {
-      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-        CarbonDecoderRelation(l.attributeMap, l.relation.asInstanceOf[CarbonDatasourceRelation])
-    }
-  }
-}
-
-/**
- * It does two jobs. 1. Change the datatype for dictionary encoded column 2. Add the dictionary
- * decoder plan.
- */
-class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
-  extends Rule[LogicalPlan] with PredicateHelper {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-  def apply(logicalPlan: LogicalPlan): LogicalPlan = {
-    if (relations.nonEmpty && !isOptimized(logicalPlan)) {
-      val plan = processPlan(logicalPlan)
-      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan)
-      LOGGER.info("Starting to optimize plan")
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
-      val queryStatistic = new QueryStatistic()
-      val result = transformCarbonPlan(udfTransformedPlan, relations)
-      queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
-        System.currentTimeMillis)
-      recorder.recordStatistics(queryStatistic)
-      recorder.logStatistics()
-      result
-    } else {
-      LOGGER.info("Skip CarbonOptimizer")
-      logicalPlan
-    }
-  }
-
-  private def processPlan(plan: LogicalPlan): LogicalPlan = {
-    plan transform {
-      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
-        var isTransformed = false
-        val newPlan = updatePlan transform {
-          case Project(pList, child) if (!isTransformed) =>
-            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
-              .splitAt(pList.size - cols.size)
-            val diff = cols.diff(dest.map(_.name))
-            if (diff.size > 0) {
-              sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
-            }
-            isTransformed = true
-            Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
-        }
-        ProjectForUpdateCommand(newPlan, table.tableIdentifier)
-    }
-  }
-  private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
-    val output = plan match {
-      case proj@Project(cols, Join(
-      left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
-        var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
-        val newCols = cols.map { col =>
-          col match {
-            case a@Alias(s: ScalaUDF, name)
-              if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-                  name.equalsIgnoreCase(
-                    CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
-              projectionToBeAdded :+= a
-              AttributeReference(name, StringType, true)().withExprId(a.exprId)
-            case other => other
-          }
-        }
-        val newLeft = left match {
-          case Project(columns, logicalPlan) =>
-            Project(columns ++ projectionToBeAdded, logicalPlan)
-          case filter: Filter =>
-            Project(filter.output ++ projectionToBeAdded, filter)
-          case other => other
-        }
-        Project(newCols, Join(newLeft, right, jointype, condition))
-      case other => other
-    }
-    output
-  }
-  def isOptimized(plan: LogicalPlan): Boolean = {
-    plan find {
-      case cd: CarbonDictionaryCatalystDecoder => true
-      case ic: InsertIntoCarbonTable => true
-      case other => false
-    } isDefined
-  }
-
-  case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
-
-  def fillNodeInfo(
-      plan: LogicalPlan,
-      extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
-    plan match {
-      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-        val extraNodeInfo = ExtraNodeInfo(true)
-        extraNodeInfo
-      case others =>
-        val extraNodeInfo = ExtraNodeInfo(false)
-        others.children.foreach { childPlan =>
-          val childExtraNodeInfo = fillNodeInfo(childPlan, extraNodeInfos)
-          if (childExtraNodeInfo.hasCarbonRelation) {
-            extraNodeInfo.hasCarbonRelation = true
-          }
-        }
-        // only put no carbon realtion plan
-        if (!extraNodeInfo.hasCarbonRelation) {
-          extraNodeInfos.put(plan, extraNodeInfo)
-        }
-        extraNodeInfo
-    }
-  }
-
-  /**
-   * Steps for changing the plan.
-   * 1. It finds out the join condition columns and dimension aggregate columns which are need to
-   * be decoded just before that plan executes.
-   * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
-   * like dimension aggregate columns decoder under aggregator and join condition decoder under
-   * join children.
-   */
-  def transformCarbonPlan(plan: LogicalPlan,
-      relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
-    if (plan.isInstanceOf[RunnableCommand]) {
-      return plan
-    }
-    var decoder = false
-    val mapOfNonCarbonPlanNodes = new java.util.HashMap[LogicalPlan, ExtraNodeInfo]
-    fillNodeInfo(plan, mapOfNonCarbonPlanNodes)
-    val aliasMap = CarbonAliasDecoderRelation()
-    // collect alias information before hand.
-    collectInformationOnAttributes(plan, aliasMap)
-
-    def hasCarbonRelation(currentPlan: LogicalPlan): Boolean = {
-      val extraNodeInfo = mapOfNonCarbonPlanNodes.get(currentPlan)
-      if (extraNodeInfo == null) {
-        true
-      } else {
-        extraNodeInfo.hasCarbonRelation
-      }
-    }
-
-    val attrMap = new util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]()
-    relations.foreach(_.fillAttributeMap(attrMap))
-
-    def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
-
-      def transformAggregateExpression(agg: Aggregate,
-          aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = {
-        val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
-        if (aggonGroups != null) {
-          attrsOndimAggs.addAll(aggonGroups)
-        }
-        agg.aggregateExpressions.map {
-          case attr: AttributeReference =>
-          case a@Alias(attr: AttributeReference, name) =>
-          case aggExp: AggregateExpression =>
-            aggExp.transform {
-              case aggExp: AggregateExpression =>
-                collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
-                aggExp
-            }
-          case others =>
-            others.collect {
-              case attr: AttributeReference
-                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-        }
-        var child = agg.child
-        // Incase if the child also aggregate then push down decoder to child
-        if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-          child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-            new util.HashSet[AttributeReferenceWrapper](),
-            agg.child)
-        }
-        if (!decoder && aggonGroups == null) {
-          decoder = true
-          CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-            new util.HashSet[AttributeReferenceWrapper](),
-            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
-            isOuter = true)
-        } else {
-          Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-        }
-      }
-
-      currentPlan match {
-        case limit@Limit(_, child: Sort) =>
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              limit,
-              isOuter = true)
-          } else {
-            limit
-          }
-        case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
-          sort.order.map { s =>
-            s.collect {
-              case attr: AttributeReference
-                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                attrsOnSort.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          var child = sort.child
-          if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
-            child = CarbonDictionaryTempDecoder(attrsOnSort,
-              new util.HashSet[AttributeReferenceWrapper](), sort.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Sort(sort.order, sort.global, child),
-              isOuter = true)
-          } else {
-            Sort(sort.order, sort.global, child)
-          }
-
-        case union: Union
-          if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
-               union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
-          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          val leftLocalAliasMap = CarbonAliasDecoderRelation()
-          val rightLocalAliasMap = CarbonAliasDecoderRelation()
-          // collect alias information for the child plan again. It is required as global alias
-          // may have duplicated in case of aliases
-          collectInformationOnAttributes(union.left, leftLocalAliasMap)
-          collectInformationOnAttributes(union.right, rightLocalAliasMap)
-          union.left.output.foreach { attr =>
-            if (isDictionaryEncoded(attr, attrMap, leftLocalAliasMap)) {
-              leftCondAttrs.add(AttributeReferenceWrapper(leftLocalAliasMap.getOrElse(attr, attr)))
-            }
-          }
-          union.right.output.foreach { attr =>
-            if (isDictionaryEncoded(attr, attrMap, rightLocalAliasMap)) {
-              rightCondAttrs.add(
-                AttributeReferenceWrapper(rightLocalAliasMap.getOrElse(attr, attr)))
-            }
-          }
-          var leftPlan = union.left
-          var rightPlan = union.right
-          if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
-              !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-            leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              union.left, isOuter = false, Some(leftLocalAliasMap))
-          }
-          if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
-              !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-            rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              union.right, isOuter = false, Some(rightLocalAliasMap))
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Union(leftPlan, rightPlan),
-              isOuter = true)
-          } else {
-            Union(leftPlan, rightPlan)
-          }
-
-        case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          transformAggregateExpression(agg)
-        case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
-          expand.projections.map {s =>
-            s.map {
-              case attr: AttributeReference =>
-              case a@Alias(attr: AttributeReference, name) =>
-              case others =>
-                others.collect {
-                  case attr: AttributeReference
-                    if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                    attrsOnExpand.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-                }
-            }
-          }
-          var child = expand.child
-          if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
-            child = CarbonDictionaryTempDecoder(attrsOnExpand,
-              new util.HashSet[AttributeReferenceWrapper](),
-              expand.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
-              isOuter = true)
-          } else {
-            CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
-          }
-        case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnConds = new util.HashSet[AttributeReferenceWrapper]
-          // In case the child is join then we cannot push down the filters so decode them earlier
-          if (filter.child.isInstanceOf[Join] || filter.child.isInstanceOf[Sort]) {
-            filter.condition.collect {
-              case attr: AttributeReference =>
-                attrsOnConds.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          } else {
-            CarbonFilters
-              .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
-          }
-
-          var child = filter.child
-          if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
-            child = CarbonDictionaryTempDecoder(attrsOnConds,
-              new util.HashSet[AttributeReferenceWrapper](),
-              filter.child)
-          }
-
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Filter(filter.condition, child),
-              isOuter = true)
-          } else {
-            Filter(filter.condition, child)
-          }
-
-        case j: Join
-          if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
-               j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
-          val attrsOnJoin = new util.HashSet[Attribute]
-          j.condition match {
-            case Some(expression) =>
-              expression.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
-              }
-            case _ =>
-          }
-
-          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          if (attrsOnJoin.size() > 0) {
-
-            attrsOnJoin.asScala.map { attr =>
-              if (qualifierPresence(j.left, attr)) {
-                leftCondAttrs.add(AttributeReferenceWrapper(attr))
-              }
-              if (qualifierPresence(j.right, attr)) {
-                rightCondAttrs.add(AttributeReferenceWrapper(attr))
-              }
-            }
-            var leftPlan = j.left
-            var rightPlan = j.right
-            if (leftCondAttrs.size() > 0 &&
-                !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              leftPlan = leftPlan match {
-                case agg: Aggregate =>
-                  CarbonDictionaryTempDecoder(leftCondAttrs,
-                    new util.HashSet[AttributeReferenceWrapper](),
-                    transformAggregateExpression(agg, leftCondAttrs))
-                case _ =>
-                  CarbonDictionaryTempDecoder(leftCondAttrs,
-                    new util.HashSet[AttributeReferenceWrapper](),
-                    j.left)
-              }
-            }
-            if (rightCondAttrs.size() > 0 &&
-                !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              rightPlan = rightPlan match {
-                case agg: Aggregate =>
-                  CarbonDictionaryTempDecoder(rightCondAttrs,
-                    new util.HashSet[AttributeReferenceWrapper](),
-                    transformAggregateExpression(agg, rightCondAttrs))
-                case _ =>
-                  CarbonDictionaryTempDecoder(rightCondAttrs,
-                    new util.HashSet[AttributeReferenceWrapper](),
-                    j.right)
-              }
-            }
-            if (!decoder) {
-              decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-                new util.HashSet[AttributeReferenceWrapper](),
-                Join(leftPlan, rightPlan, j.joinType, j.condition),
-                isOuter = true)
-            } else {
-              Join(leftPlan, rightPlan, j.joinType, j.condition)
-            }
-          } else {
-            j
-          }
-
-        case p: Project
-          if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
-          p.projectList.map {
-            case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) =>
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          var child = p.child
-          if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
-            child = CarbonDictionaryTempDecoder(attrsOnProjects,
-              new util.HashSet[AttributeReferenceWrapper](),
-              p.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Project(p.projectList, child),
-              isOuter = true)
-          } else {
-            Project(p.projectList, child)
-          }
-
-        case wd: Window if !wd.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
-          wd.projectList.map {
-            case attr: AttributeReference =>
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          wd.windowExpressions.map { others =>
-            others.collect {
-              case attr: AttributeReference
-                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          wd.partitionSpec.map{
-            case attr: AttributeReference =>
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          wd.orderSpec.map { s =>
-            s.collect {
-              case attr: AttributeReference
-                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          wd.partitionSpec.map { s =>
-            s.collect {
-              case attr: AttributeReference
-                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          var child = wd.child
-          if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
-            child = CarbonDictionaryTempDecoder(attrsOnProjects,
-              new util.HashSet[AttributeReferenceWrapper](),
-              wd.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child),
-              isOuter = true)
-          } else {
-            Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child)
-          }
-
-        case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](), l, isOuter = true)
-          } else {
-            l
-          }
-
-        case others => others
-      }
-
-    }
-
-    val transFormedPlan =
-      plan transformDown {
-        case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
-          decoder = true
-          cd
-        case currentPlan =>
-          if (hasCarbonRelation(currentPlan)) {
-            addTempDecoder(currentPlan)
-          } else {
-            currentPlan
-          }
-      }
-
-    val processor = new CarbonDecoderProcessor
-    processor.updateDecoders(processor.getDecoderList(transFormedPlan))
-    updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap))
-  }
-
-  private def updateTempDecoder(plan: LogicalPlan,
-      aliasMapOriginal: CarbonAliasDecoderRelation,
-      attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]):
-  LogicalPlan = {
-    var allAttrsNotDecode: util.Set[AttributeReferenceWrapper] =
-      new util.HashSet[AttributeReferenceWrapper]()
-    val marker = new CarbonPlanMarker
-    var aliasMap = aliasMapOriginal
-    plan transformDown {
-      case cd: CarbonDictionaryTempDecoder if !cd.processed =>
-        cd.processed = true
-        allAttrsNotDecode = cd.attrsNotDecode
-        aliasMap = cd.aliasMap.getOrElse(aliasMap)
-        marker.pushMarker(allAttrsNotDecode)
-        if (cd.isOuter) {
-          CarbonDictionaryCatalystDecoder(relations,
-            ExcludeProfile(cd.getAttrsNotDecode.asScala.toSeq),
-            aliasMap,
-            isOuter = true,
-            cd.child)
-        } else {
-          CarbonDictionaryCatalystDecoder(relations,
-            IncludeProfile(cd.getAttrList.asScala.toSeq),
-            aliasMap,
-            isOuter = false,
-            cd.child)
-        }
-      case cd: CarbonDictionaryCatalystDecoder =>
-        cd
-      case sort: Sort =>
-        val sortExprs = sort.order.map { s =>
-          s.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }.asInstanceOf[SortOrder]
-        }
-        Sort(sortExprs, sort.global, sort.child)
-      case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
-        val aggExps = agg.aggregateExpressions.map { aggExp =>
-          aggExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }.asInstanceOf[Seq[NamedExpression]]
-
-        val grpExps = agg.groupingExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }
-        Aggregate(grpExps, aggExps, agg.child)
-      case expand: Expand =>
-        expand.transformExpressions {
-          case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-        }
-      case filter: Filter =>
-        val filterExps = filter.condition transform {
-          case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-        }
-        Filter(filterExps, filter.child)
-      case j: Join =>
-        marker.pushBinaryMarker(allAttrsNotDecode)
-        j
-      case u: Union =>
-        marker.pushBinaryMarker(allAttrsNotDecode)
-        u
-      case p: Project if relations.nonEmpty =>
-        val prExps = p.projectList.map { prExp =>
-          prExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }.asInstanceOf[Seq[NamedExpression]]
-        Project(prExps, p.child)
-      case wd: Window if relations.nonEmpty =>
-        val prExps = wd.projectList.map { prExp =>
-          prExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }.asInstanceOf[Seq[Attribute]]
-        val wdExps = wd.windowExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }.asInstanceOf[Seq[NamedExpression]]
-        val partitionSpec = wd.partitionSpec.map{ exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }
-        val orderSpec = wd.orderSpec.map { exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }
-        }.asInstanceOf[Seq[SortOrder]]
-        Window(prExps, wdExps, partitionSpec, orderSpec, wd.child)
-
-      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
-        allAttrsNotDecode = marker.revokeJoin()
-        l
-      case others => others
-    }
-  }
-
-  private def updateProjection(plan: LogicalPlan): LogicalPlan = {
-    val transFormedPlan = plan transform {
-      case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) =>
-        if (cd.child.isInstanceOf[Filter] || cd.child.isInstanceOf[LogicalRelation]) {
-          Project(projectList: Seq[NamedExpression], cd.child)
-        } else {
-          p
-        }
-      case f@Filter(condition: Expression, cd: CarbonDictionaryCatalystDecoder) =>
-        if (cd.child.isInstanceOf[Project] || cd.child.isInstanceOf[LogicalRelation]) {
-          Filter(condition, cd.child)
-        } else {
-          f
-        }
-    }
-    // Remove unnecessary decoders
-    val finalPlan = transFormedPlan transform {
-      case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
-        if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
-        child
-    }
-    val updateDtrFn = finalPlan transform {
-      case p@Project(projectList: Seq[NamedExpression], cd) =>
-        if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
-          p.transformAllExpressions {
-            case a@Alias(exp, _)
-              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
-                a.explicitMetadata)
-            case exp: NamedExpression
-              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              CustomDeterministicExpression(exp)
-          }
-        } else {
-          p
-        }
-      case f@Filter(condition: Expression, cd) =>
-        if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
-          f.transformAllExpressions {
-            case a@Alias(exp, _)
-              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
-                a.explicitMetadata)
-            case exp: NamedExpression
-              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
-              CustomDeterministicExpression(exp)
-          }
-        } else {
-          f
-        }
-    }
-
-    updateDtrFn
-  }
-
-  private def collectInformationOnAttributes(plan: LogicalPlan,
-      aliasMap: CarbonAliasDecoderRelation) {
-    plan transformAllExpressions  {
-      case a@Alias(exp, name) =>
-        exp match {
-          case attr: Attribute => aliasMap.put(a.toAttribute, attr)
-          case _ => aliasMap.put(a.toAttribute, AttributeReference("", StringType)())
-        }
-        a
-    }
-    // collect the output of expand and add projections attributes as alias to it.
-    plan.collect {
-      case expand: Expand =>
-        expand.projections.foreach {s =>
-          s.zipWithIndex.foreach { f =>
-            f._1 match {
-              case attr: AttributeReference =>
-                aliasMap.put(expand.output(f._2).toAttribute, attr)
-              case a@Alias(attr: AttributeReference, name) =>
-                aliasMap.put(expand.output(f._2).toAttribute, attr)
-              case others =>
-            }
-          }
-        }
-    }
-  }
-
-  // Collect aggregates on dimensions so that we can add decoder to it.
-  private def collectDimensionAggregates(aggExp: AggregateExpression,
-      attrsOndimAggs: util.HashSet[AttributeReferenceWrapper],
-      aliasMap: CarbonAliasDecoderRelation,
-      attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]) {
-    aggExp collect {
-      case attr: AttributeReference if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-        attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-    }
-  }
-
-  /**
-   * Update the attribute datatype with [IntegerType] if the carbon column is encoded with
-   * dictionary.
-   *
-   */
-  private def updateDataType(attr: Attribute,
-      attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
-      allAttrsNotDecode: java.util.Set[AttributeReferenceWrapper],
-      aliasMap: CarbonAliasDecoderRelation): Attribute = {
-    val uAttr = aliasMap.getOrElse(attr, attr)
-    val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
-    if (relation.isDefined) {
-      relation.get.dictionaryMap.get(uAttr.name) match {
-        case Some(true)
-          if !allAttrsNotDecode.contains(AttributeReferenceWrapper(uAttr)) =>
-          val newAttr = AttributeReference(attr.name,
-            IntegerType,
-            attr.nullable,
-            attr.metadata)(attr.exprId, attr.qualifiers)
-          newAttr
-        case _ => attr
-      }
-    } else {
-      attr
-    }
-  }
-
-  private def isDictionaryEncoded(attr: Attribute,
-      attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
-      aliasMap: CarbonAliasDecoderRelation): Boolean = {
-    val uAttr = aliasMap.getOrElse(attr, attr)
-    val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
-    if (relation.isDefined) {
-      relation.get.dictionaryMap.get(uAttr.name) match {
-        case Some(true) => true
-        case _ => false
-      }
-    } else {
-      false
-    }
-  }
-
-  def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
-    var present = false
-    plan collect {
-      case l: LogicalRelation if l.attributeMap.contains(attr) =>
-        present = true
-    }
-    present
-  }
-}
-
-case class CarbonDecoderRelation(
-    attributeMap: AttributeMap[AttributeReference],
-    carbonRelation: CarbonDatasourceRelation) {
-
-  val extraAttrs = new ArrayBuffer[Attribute]()
-
-  def addAttribute(attr: Attribute): Unit = {
-    extraAttrs += attr
-  }
-
-  def contains(attr: Attribute): Boolean = {
-    val exists =
-      attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
-                                   entry._1.exprId.equals(attr.exprId)) ||
-      extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
-                                 entry.exprId.equals(attr.exprId))
-    exists
-  }
-
-  def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper,
-    CarbonDecoderRelation]): Unit = {
-    attributeMap.foreach { attr =>
-      attrMap.put(AttributeReferenceWrapper(attr._1), this)
-    }
-  }
-
-  lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
-}
-
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
deleted file mode 100644
index bb00126..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.test
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{CarbonContext, DataFrame, SQLContext}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * This class is a sql executor of unit test case for spark version 1.x.
- */
-
-class SparkTestQueryExecutor extends TestQueryExecutorRegister {
-  override def sql(sqlText: String): DataFrame = SparkTestQueryExecutor.cc.sql(sqlText)
-
-  override def sqlContext: SQLContext = SparkTestQueryExecutor.cc
-
-  override def stop(): Unit = SparkTestQueryExecutor.cc.sparkContext.stop()
-}
-
-object SparkTestQueryExecutor {
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  LOGGER.info("use TestQueryExecutorImplV1")
-  CarbonProperties.getInstance()
-    .addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-      System.getProperty("java.io.tmpdir"))
-    .addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL)
-    .addProperty(CarbonCommonConstants.STORE_LOCATION, TestQueryExecutor.storeLocation)
-
-  val sc = new SparkContext(new SparkConf()
-    .setAppName("CarbonSpark")
-    .setMaster("local[2]")
-    .set("spark.sql.shuffle.partitions", "20"))
-  sc.setLogLevel("ERROR")
-
-  val cc = new CarbonContext(sc, TestQueryExecutor.storeLocation, TestQueryExecutor.metastoredb)
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
deleted file mode 100644
index e73f78c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.util
-
-import org.apache.spark.TaskContext
-
-
-object TaskContextUtil {
-  def setTaskContext(context: TaskContext): Unit = {
-    val localThreadContext = TaskContext.get()
-    if (localThreadContext == null) {
-      TaskContext.setTaskContext(context)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
deleted file mode 100644
index d09c9b5..0000000
--- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ /dev/null
@@ -1,17 +0,0 @@
-## ------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ------------------------------------------------------------------------
-org.apache.spark.sql.CarbonSource
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
----------------------------------------------------------------------
diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
deleted file mode 100644
index fc96db4..0000000
--- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
+++ /dev/null
@@ -1,17 +0,0 @@
-## ------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ------------------------------------------------------------------------
-org.apache.spark.sql.test.SparkTestQueryExecutor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/resources/badrecords/test2.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/test2.csv b/integration/spark/src/test/resources/badrecords/test2.csv
deleted file mode 100644
index 51d25b2..0000000
--- a/integration/spark/src/test/resources/badrecords/test2.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-0,569,silo
-1,843658743265874365874365874365584376547598375987,hello
-2,87436587349436587568784658743065874376,priyal
-3,56985,simple
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
deleted file mode 100644
index aaaf66b..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.complexType
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * Test class of creating and loading for carbon table with double
- *
- */
-class TestComplexPrimitiveTimestampDirectDictionary extends QueryTest with BeforeAndAfterAll {
-
-  override def beforeAll: Unit = {
-    sql("drop table if exists complexcarbontimestamptable")
-    sql("drop table if exists complexhivetimestamptable")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS")
-    sql("CREATE TABLE complexcarbontimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double) STORED BY 'org.apache.carbondata.format'")
-    sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexcarbontimestamptable OPTIONS" +
-        "('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='empno,workdate,punchinout,worktime,salary')");
-    sql("CREATE TABLE complexhivetimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double)row format delimited fields terminated by ',' collection items terminated by '$'")
-    sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexhivetimestamptable")
-  }
-
-  test("select * query") {
-     checkAnswer(sql("select * from complexcarbontimestamptable"),
-     sql("select * from complexhivetimestamptable"))
-  }
-  
-  test("timestamp complex type in the middle of complex types") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS")
-    sql("CREATE TABLE testtimestampcarbon(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)STORED BY 'org.apache.carbondata.format'")
-    sql("LOAD DATA local inpath '" + resourcesPath + "/timestampdata.csv' INTO TABLE testtimestampcarbon options('DELIMITER'=',', 'QUOTECHAR'='\"','COMPLEX_DELIMITER_LEVEL_1'='$', 'FILEHEADER'='imei,rat,sid,end_time,probeid,contact')")
-    sql("CREATE TABLE testtimestamphive(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)row format delimited fields terminated by ',' collection items terminated by '$'")
-    sql(s"LOAD DATA local inpath '$resourcesPath/timestampdata.csv' INTO TABLE testtimestamphive")
-    checkAnswer(sql("select * from testtimestampcarbon"), sql("select * from testtimestamphive"))
-    sql("drop table if exists testtimestampcarbon")
-    sql("drop table if exists testtimestamphive")
-  }
-  
-  override def afterAll {
-	  sql("drop table if exists complexcarbontimestamptable")
-    sql("drop table if exists complexhivetimestamptable")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
deleted file mode 100644
index 98e4f18..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.dataload
-
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
-
-  var df: DataFrame = _
-
-  override def beforeAll {
-    sql("DROP TABLE IF EXISTS carbon1")
-
-    import sqlContext.implicits._
-    df = sqlContext.sparkContext.parallelize(1 to 1000)
-        .map(x => ("a", "b", x))
-        .toDF("c1", "c2", "c3")
-
-    // save dataframe to carbon file
-    df.write
-        .format("carbondata")
-        .option("tableName", "carbon1")
-        .mode(SaveMode.Overwrite)
-        .save()
-  }
-
-  test("read and write using CarbonContext") {
-    val in = sqlContext.read
-        .format("carbondata")
-        .option("tableName", "carbon1")
-        .load()
-
-    assert(in.where("c3 > 500").count() == 500)
-  }
-
-  test("read and write using CarbonContext with compression") {
-    val in = sqlContext.read
-        .format("carbondata")
-        .option("tableName", "carbon1")
-        .option("compress", "true")
-        .load()
-
-    assert(in.where("c3 > 500").count() == 500)
-  }
-
-  test("test overwrite") {
-    sql("DROP TABLE IF EXISTS carbon4")
-    df.write
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .mode(SaveMode.Overwrite)
-        .save()
-    df.write
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .mode(SaveMode.Overwrite)
-        .save()
-    val in = sqlContext.read
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .load()
-    assert(in.where("c3 > 500").count() == 500)
-    sql("DROP TABLE IF EXISTS carbon4")
-  }
-
-  test("read and write using CarbonContext, multiple load") {
-    sql("DROP TABLE IF EXISTS carbon4")
-    df.write
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .mode(SaveMode.Overwrite)
-        .save()
-    df.write
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .mode(SaveMode.Append)
-        .save()
-    val in = sqlContext.read
-        .format("carbondata")
-        .option("tableName", "carbon4")
-        .load()
-    assert(in.where("c3 > 500").count() == 1000)
-    sql("DROP TABLE IF EXISTS carbon4")
-  }
-  
-  test("query using SQLContext") {
-    val newSQLContext = new SQLContext(sqlContext.sparkContext)
-    newSQLContext.sql(
-      s"""
-         | CREATE TEMPORARY TABLE temp
-         | (c1 string, c2 string, c3 int)
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS (path '$storeLocation/default/carbon1')
-      """.stripMargin)
-    checkAnswer(newSQLContext.sql(
-      """
-        | SELECT c1, c2, count(*)
-        | FROM temp
-        | WHERE c3 > 100
-        | GROUP BY c1, c2
-      """.stripMargin), Seq(Row("a", "b", 900)))
-    newSQLContext.dropTempTable("temp")
-  }
-
-  test("query using SQLContext without providing schema") {
-    val newSQLContext = new SQLContext(sqlContext.sparkContext)
-    newSQLContext.sql(
-      s"""
-         | CREATE TEMPORARY TABLE temp
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS (path '$storeLocation/default/carbon1')
-      """.stripMargin)
-    checkAnswer(newSQLContext.sql(
-      """
-        | SELECT c1, c2, count(*)
-        | FROM temp
-        | WHERE c3 > 100
-        | GROUP BY c1, c2
-      """.stripMargin), Seq(Row("a", "b", 900)))
-    newSQLContext.dropTempTable("temp")
-  }
-
-  test("query using SQLContext, multiple load") {
-    sql("DROP TABLE IF EXISTS test")
-    sql(
-      """
-        | CREATE TABLE test(id int, name string, city string, age int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-    val testData = s"${resourcesPath}/sample.csv"
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
-
-    val newSQLContext = new SQLContext(sqlContext.sparkContext)
-    newSQLContext.sql(
-      s"""
-         | CREATE TEMPORARY TABLE temp
-         | (id int, name string, city string, age int)
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS (path '$storeLocation/default/test')
-      """.stripMargin)
-    checkAnswer(newSQLContext.sql(
-      """
-        | SELECT count(id)
-        | FROM temp
-      """.stripMargin), Seq(Row(8)))
-    newSQLContext.dropTempTable("temp")
-    sql("DROP TABLE test")
-  }
-
-  test("json data with long datatype issue CARBONDATA-405") {
-    val jsonDF = sqlContext.read.format("json").load(s"$resourcesPath/test.json")
-    jsonDF.write
-      .format("carbondata")
-      .option("tableName", "dftesttable")
-      .option("compress", "true")
-      .mode(SaveMode.Overwrite)
-      .save()
-    val carbonDF = sqlContext
-      .read
-      .format("carbondata")
-      .option("tableName", "dftesttable")
-      .load()
-    checkAnswer(
-      carbonDF.select("age", "name"),
-      jsonDF.select("age", "name"))
-    sql("drop table dftesttable")
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS carbon1")
-    sql("DROP TABLE IF EXISTS carbon2")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
deleted file mode 100644
index b61ecce..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.dataload
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-/**
- * Test Class for data loading when there is single quote in fact data
- *
- */
-class TestLoadDataWithSingleQuotechar extends QueryTest with BeforeAndAfterAll {
-  override def beforeAll {
-    sql("DROP TABLE IF EXISTS carbontable")
-    sql(
-      "CREATE TABLE carbontable (id Int, name String) STORED BY 'carbondata'")
-  }
-
-  test("test data loading with single quote char") {
-    try {
-      sql(
-        s"LOAD DATA LOCAL INPATH '$resourcesPath/dataWithSingleQuote.csv' INTO TABLE " +
-          "carbontable OPTIONS('DELIMITER'= ',')")
-      sql("SELECT * from carbontable")
-      checkAnswer(
-        sql("SELECT * from carbontable"),
-        Seq(Row(1,"Tom"),
-          Row(2,"Tony\n3,Lily"),
-          Row(4,"Games\""),
-          Row(5,"prival\"\n6,\"hello\"")
-        )
-      )
-    } catch {
-      case e: Throwable =>
-        assert(false)
-    }
-  }
-
-  override def afterAll {
-    sql("DROP TABLE carbontable")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
deleted file mode 100644
index 1c003c7..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.allqueries
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{Row, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
-  * Test Class for all query on multiple datatypes
-  *
-  */
-class AllQueriesSpark1TestCase extends QueryTest with BeforeAndAfterAll {
-
-  override def beforeAll {
-    clean
-
-    sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
 _phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    sql("LOAD DATA LOCAL INPATH '" + resourcesPath + "/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Lat
 est_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
-  }
-
-  def clean {
-    sql("drop table if exists Carbon_automation_test")
-  }
-  
-  override def afterAll {
-    clean
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
-  }
-
-
-  //TC_113
-  test("select percentile_approx(deviceInformationId,0.2) as a  from Carbon_automation_test")({
-    checkAnswer(
-      sql("select percentile_approx(deviceInformationId,0.2) as a  from Carbon_automation_test"),
-      Seq(Row(100005.8)))
-  })
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
deleted file mode 100644
index d762ec6..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.allqueries
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class InsertIntoCarbonTableSpark1TestCase extends QueryTest with BeforeAndAfterAll {
-  override def beforeAll {
-    sql("drop table if exists THive")
-    sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions st
 ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
-    sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
-  }
-
-
-  test("insert from carbon-select columns-source table has more column then target column") {
-    val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
-     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-     
-     sql("drop table if exists load")
-     sql("drop table if exists inser")
-     sql("CREATE TABLE load(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,name string,point int)STORED BY 'org.apache.carbondata.format'")
-     sql("LOAD DATA INPATH '" + resourcesPath + "/shortolap.csv' INTO TABLE load options ('DELIMITER'=',', 'QUOTECHAR'='\"','FILEHEADER' = 'imei,age,task,num,level,productdate,name,point')")
-     sql("CREATE TABLE inser(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp)STORED BY 'org.apache.carbondata.format'")
-     sql("insert into inser select * from load")
-     checkAnswer(
-         sql("select * from inser"),
-         sql("select imei,age,task,num,level,productdate from load")
-     ) 
-     sql("drop table if exists load")
-     sql("drop table if exists inser")
-     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
-  }
-
-  test("insert->hive column more than carbon column->success") {
-     sql("drop table if exists TCarbon")
-     sql("create table TCarbon (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
-  
-     sql("insert into TCarbon select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked from THive")
-     checkAnswer(
-         sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from THive"),
-         sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from TCarbon")
-     )
-  }
-
-//  test("insert->insert empty data -pass") {
-//     sql("drop table if exists TCarbon")
-//     sql("create table TCarbon (imei string,deviceInformationId int,MAC string) STORED BY 'org.apache.carbondata.format'")
-//     sql("insert into TCarbon select imei,deviceInformationId,MAC from THive where MAC='wrongdata'")
-//     val result = sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'").collect()
-//     checkAnswer(
-//         sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"),
-//         sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'")
-//     )
-//  }
-
-  override def afterAll {
-    sql("drop table if exists load")
-    sql("drop table if exists inser")
-    sql("DROP TABLE IF EXISTS THive")
-    sql("DROP TABLE IF EXISTS TCarbon")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
deleted file mode 100644
index 7aee00d..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.badrecordloger
-
-import java.io.File
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.hive.HiveContext
-import org.scalatest.BeforeAndAfterAll
-
-
-/**
- * Test Class for detailed query on timestamp datatypes
- *
- *
- */
-class BadRecordLoggerSharedDictionaryTest extends QueryTest with BeforeAndAfterAll {
-  var hiveContext: HiveContext = _
-  var csvFilePath : String = null
-  var timestamp_format: String = null
-
-  override def beforeAll {
-      sql("drop table IF EXISTS testdrive")
-    sql(
-      """create table testdrive (ID int,CUST_ID int,cust_name string)
-          STORED BY 'org.apache.carbondata.format'
-            TBLPROPERTIES("columnproperties.cust_name.shared_column"="shared.cust_name",
-            "columnproperties.ID.shared_column"="shared.ID",
-            "columnproperties.CUST_ID.shared_column"="shared.CUST_ID",
-            'DICTIONARY_INCLUDE'='ID,CUST_ID')"""
-    )
-
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          new File("./target/test/badRecords")
-            .getCanonicalPath
-        )
-
-    val carbonProp = CarbonProperties.getInstance()
-    timestamp_format = carbonProp.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    carbonProp.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/mm/dd")
-    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
-      .getCanonicalPath
-    csvFilePath = currentDirectory + "/src/test/resources/badrecords/test2.csv"
-
-  }
-  test("dataload with bad record test") {
-    try {
-      sql(
-        s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
-            |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
-            |'FILEHEADER'= 'ID,CUST_ID,cust_name')""".stripMargin)
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage.contains("Data load failed due to bad record"))
-    }
-  }
-
-  override def afterAll {
-    sql("drop table IF EXISTS testdrive")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestamp_format)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
deleted file mode 100644
index 7400839..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createtable
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-/**
- * Test Class for validating create table syntax for carbontable
- *
- */
-class TestCreateTableSyntax extends QueryTest with BeforeAndAfterAll {
-  
-  override def beforeAll {
-  }
-
-  test("Struct field with underscore and struct<struct> syntax check") {
-    sql("drop table if exists carbontable")
-    sql("create table carbontable(id int, username struct<sur_name:string," +
-        "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" +
-        "STORED BY 'org.apache.carbondata.format'")
-    sql("describe carbontable")
-  }
-  
-  test("Test table rename operation on carbon table and on hive table") {
-    sql("drop table if exists hivetable")
-    sql("drop table if exists carbontable")
-    sql("drop table if exists hiveRenamedTable")
-    sql("drop table if exists carbonRenamedTable")
-    sql("create table hivetable(test1 int, test2 array<String>,test3 array<bigint>,"+
-        "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+
-        "row format delimited fields terminated by ',' collection items terminated by '$' map keys terminated by ':'")
-    sql("alter table hivetable rename To hiveRenamedTable")
-    sql("create table carbontable(test1 int, test2 array<String>,test3 array<bigint>,"+
-        "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+
-        "STORED BY 'org.apache.carbondata.format'")
-    sql("alter table carbontable compact 'minor'")
-    try {
-      sql("alter table carbontable rename To carbonRenamedTable")
-      assert(false)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("Unsupported alter operation on carbon table"))
-      }
-    }
-  }
-
-  
-  test("test carbon table create with complex datatype as dictionary exclude") {
-    try {
-      sql("drop table if exists carbontable")
-      sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
-          "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
-          "TBLPROPERTIES('DICTIONARY_EXCLUDE'='dept,mobile')")
-      assert(false)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for complex datatype column: mobile"))
-      }
-    }
-  }
-
-  test("test carbon table create with double datatype as dictionary exclude") {
-    try {
-      sql("drop table if exists carbontable")
-      sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
-        "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')")
-      assert(false)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for double " +
-          "data type column: salary"))
-      }
-    }
-  }
-  test("test carbon table create with int datatype as dictionary exclude") {
-    sql("drop table if exists carbontable")
-    sql("create table carbontable(id int, name string, dept string, mobile array<string>, " +
-        "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_EXCLUDE'='id')")
-    assert(true)
-  }
-
-  test("test carbon table create with decimal datatype as dictionary exclude") {
-    try {
-      sql("drop table if exists carbontable")
-      sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
-        "country string, salary decimal) STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')")
-      assert(false)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for decimal " +
-          "data type column: salary"))
-      }
-    }
-  }
-  
-  test("describe formatted on hive table and carbon table") {
-    sql("drop table if exists hivetable")
-    sql("drop table if exists carbontable")
-    sql("create table carbontable(id int, username struct<sur_name:string," +
-        "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" +
-        "STORED BY 'org.apache.carbondata.format'")
-    sql("describe formatted carbontable")
-    sql("create table hivetable(id int, username struct<sur_name:string," +
-        "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)")
-    sql("describe formatted hivetable")
-  }
-
-  test("describe command carbon table for decimal scale and precision test") {
-    sql("drop table if exists carbontablePrecision")
-    sql("create table carbontablePrecision(id int, name string, dept string, mobile array<string>, "+
-        "country string, salary decimal(10,6)) STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_INCLUDE'='salary,id')")
-    checkAnswer(
-      sql("describe carbontablePrecision"),
-      Seq(Row("country","string",""),
-        Row("dept","string",""),Row("id","int",""),Row("mobile","array<string>",""),Row("name","string",""),
-        Row("salary","decimal(10,6)","")
-      )
-    )
-  }
-  
-  test("create carbon table without dimensions") {
-    try {
-      sql("drop table if exists carbontable")
-      sql("create table carbontable(msr1 int, msr2 double, msr3 bigint, msr4 decimal)" +
-        " stored by 'org.apache.carbondata.format'")
-      assert(true)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("Table default.carbontable can not be created without " +
-          "key columns. Please use DICTIONARY_INCLUDE or DICTIONARY_EXCLUDE to " +
-          "set at least one key column if all specified columns are numeric types"))
-      }
-    }
-  }
-
-  test("create carbon table with repeated table properties") {
-    try {
-      sql("drop table if exists carbontable")
-      sql(
-        """
-          CREATE TABLE IF NOT EXISTS carbontable
-          (ID Int, date Timestamp, country String,
-          name String, phonetype String, serialname String, salary Int)
-          STORED BY 'carbondata'
-          TBLPROPERTIES('DICTIONARY_EXCLUDE'='country','DICTIONARY_INCLUDE'='ID',
-          'DICTIONARY_EXCLUDE'='phonetype', 'DICTIONARY_INCLUDE'='salary')
-        """)
-      assert(false)
-    } catch {
-      case e : MalformedCarbonCommandException => {
-        assert(e.getMessage.equals("Table properties is repeated: dictionary_include,dictionary_exclude"))
-      }
-    }
-  }
-
-  override def afterAll {
-    sql("drop table if exists hivetable")
-    sql("drop table if exists carbontable")
-    sql("drop table if exists hiveRenamedTable")
-    sql("drop table if exists carbonRenamedTable")
-    sql("drop table if exists carbontablePrecision")
-  }
-}
\ No newline at end of file