You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:07 UTC
[17/35] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
deleted file mode 100644
index 914203f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ /dev/null
@@ -1,862 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.optimizer
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.ProjectForUpdateCommand
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.stats.QueryStatistic
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.spark.{CarbonAliasDecoderRelation, CarbonFilters}
-
-/**
- * Carbon Optimizer to add dictionary decoder.
- */
-object CarbonOptimizer {
-
- def optimizer(optimizer: Optimizer, conf: CarbonSQLConf, version: String): Optimizer = {
- CodeGenerateFactory.getInstance().optimizerFactory.createOptimizer(optimizer, conf)
- }
-
- def execute(plan: LogicalPlan, optimizer: Optimizer): LogicalPlan = {
- val executedPlan: LogicalPlan = optimizer.execute(plan)
- val relations = CarbonOptimizer.collectCarbonRelation(plan)
- if (relations.nonEmpty) {
- new ResolveCarbonFunctions(relations).apply(executedPlan)
- } else {
- executedPlan
- }
- }
-
- // get the carbon relation from plan.
- def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
- plan collect {
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- CarbonDecoderRelation(l.attributeMap, l.relation.asInstanceOf[CarbonDatasourceRelation])
- }
- }
-}
-
-/**
- * It does two jobs. 1. Change the datatype for dictionary encoded column 2. Add the dictionary
- * decoder plan.
- */
-class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
- extends Rule[LogicalPlan] with PredicateHelper {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- def apply(logicalPlan: LogicalPlan): LogicalPlan = {
- if (relations.nonEmpty && !isOptimized(logicalPlan)) {
- val plan = processPlan(logicalPlan)
- val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan)
- LOGGER.info("Starting to optimize plan")
- val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
- val queryStatistic = new QueryStatistic()
- val result = transformCarbonPlan(udfTransformedPlan, relations)
- queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
- System.currentTimeMillis)
- recorder.recordStatistics(queryStatistic)
- recorder.logStatistics()
- result
- } else {
- LOGGER.info("Skip CarbonOptimizer")
- logicalPlan
- }
- }
-
- private def processPlan(plan: LogicalPlan): LogicalPlan = {
- plan transform {
- case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
- var isTransformed = false
- val newPlan = updatePlan transform {
- case Project(pList, child) if (!isTransformed) =>
- val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
- .splitAt(pList.size - cols.size)
- val diff = cols.diff(dest.map(_.name))
- if (diff.size > 0) {
- sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
- }
- isTransformed = true
- Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
- }
- ProjectForUpdateCommand(newPlan, table.tableIdentifier)
- }
- }
- private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
- val output = plan match {
- case proj@Project(cols, Join(
- left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
- var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
- val newCols = cols.map { col =>
- col match {
- case a@Alias(s: ScalaUDF, name)
- if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
- projectionToBeAdded :+= a
- AttributeReference(name, StringType, true)().withExprId(a.exprId)
- case other => other
- }
- }
- val newLeft = left match {
- case Project(columns, logicalPlan) =>
- Project(columns ++ projectionToBeAdded, logicalPlan)
- case filter: Filter =>
- Project(filter.output ++ projectionToBeAdded, filter)
- case other => other
- }
- Project(newCols, Join(newLeft, right, jointype, condition))
- case other => other
- }
- output
- }
- def isOptimized(plan: LogicalPlan): Boolean = {
- plan find {
- case cd: CarbonDictionaryCatalystDecoder => true
- case ic: InsertIntoCarbonTable => true
- case other => false
- } isDefined
- }
-
- case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
-
- def fillNodeInfo(
- plan: LogicalPlan,
- extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
- plan match {
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- val extraNodeInfo = ExtraNodeInfo(true)
- extraNodeInfo
- case others =>
- val extraNodeInfo = ExtraNodeInfo(false)
- others.children.foreach { childPlan =>
- val childExtraNodeInfo = fillNodeInfo(childPlan, extraNodeInfos)
- if (childExtraNodeInfo.hasCarbonRelation) {
- extraNodeInfo.hasCarbonRelation = true
- }
- }
- // only put no carbon realtion plan
- if (!extraNodeInfo.hasCarbonRelation) {
- extraNodeInfos.put(plan, extraNodeInfo)
- }
- extraNodeInfo
- }
- }
-
- /**
- * Steps for changing the plan.
- * 1. It finds out the join condition columns and dimension aggregate columns which are need to
- * be decoded just before that plan executes.
- * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
- * like dimension aggregate columns decoder under aggregator and join condition decoder under
- * join children.
- */
- def transformCarbonPlan(plan: LogicalPlan,
- relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
- if (plan.isInstanceOf[RunnableCommand]) {
- return plan
- }
- var decoder = false
- val mapOfNonCarbonPlanNodes = new java.util.HashMap[LogicalPlan, ExtraNodeInfo]
- fillNodeInfo(plan, mapOfNonCarbonPlanNodes)
- val aliasMap = CarbonAliasDecoderRelation()
- // collect alias information before hand.
- collectInformationOnAttributes(plan, aliasMap)
-
- def hasCarbonRelation(currentPlan: LogicalPlan): Boolean = {
- val extraNodeInfo = mapOfNonCarbonPlanNodes.get(currentPlan)
- if (extraNodeInfo == null) {
- true
- } else {
- extraNodeInfo.hasCarbonRelation
- }
- }
-
- val attrMap = new util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]()
- relations.foreach(_.fillAttributeMap(attrMap))
-
- def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
-
- def transformAggregateExpression(agg: Aggregate,
- aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = {
- val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
- if (aggonGroups != null) {
- attrsOndimAggs.addAll(aggonGroups)
- }
- agg.aggregateExpressions.map {
- case attr: AttributeReference =>
- case a@Alias(attr: AttributeReference, name) =>
- case aggExp: AggregateExpression =>
- aggExp.transform {
- case aggExp: AggregateExpression =>
- collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
- aggExp
- }
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- var child = agg.child
- // Incase if the child also aggregate then push down decoder to child
- if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
- child = CarbonDictionaryTempDecoder(attrsOndimAggs,
- new util.HashSet[AttributeReferenceWrapper](),
- agg.child)
- }
- if (!decoder && aggonGroups == null) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
- isOuter = true)
- } else {
- Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
- }
- }
-
- currentPlan match {
- case limit@Limit(_, child: Sort) =>
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- limit,
- isOuter = true)
- } else {
- limit
- }
- case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
- sort.order.map { s =>
- s.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnSort.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- var child = sort.child
- if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
- child = CarbonDictionaryTempDecoder(attrsOnSort,
- new util.HashSet[AttributeReferenceWrapper](), sort.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Sort(sort.order, sort.global, child),
- isOuter = true)
- } else {
- Sort(sort.order, sort.global, child)
- }
-
- case union: Union
- if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
- union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
- val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- val leftLocalAliasMap = CarbonAliasDecoderRelation()
- val rightLocalAliasMap = CarbonAliasDecoderRelation()
- // collect alias information for the child plan again. It is required as global alias
- // may have duplicated in case of aliases
- collectInformationOnAttributes(union.left, leftLocalAliasMap)
- collectInformationOnAttributes(union.right, rightLocalAliasMap)
- union.left.output.foreach { attr =>
- if (isDictionaryEncoded(attr, attrMap, leftLocalAliasMap)) {
- leftCondAttrs.add(AttributeReferenceWrapper(leftLocalAliasMap.getOrElse(attr, attr)))
- }
- }
- union.right.output.foreach { attr =>
- if (isDictionaryEncoded(attr, attrMap, rightLocalAliasMap)) {
- rightCondAttrs.add(
- AttributeReferenceWrapper(rightLocalAliasMap.getOrElse(attr, attr)))
- }
- }
- var leftPlan = union.left
- var rightPlan = union.right
- if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
- !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- union.left, isOuter = false, Some(leftLocalAliasMap))
- }
- if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
- !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- union.right, isOuter = false, Some(rightLocalAliasMap))
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Union(leftPlan, rightPlan),
- isOuter = true)
- } else {
- Union(leftPlan, rightPlan)
- }
-
- case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- transformAggregateExpression(agg)
- case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
- expand.projections.map {s =>
- s.map {
- case attr: AttributeReference =>
- case a@Alias(attr: AttributeReference, name) =>
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnExpand.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- }
- var child = expand.child
- if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
- child = CarbonDictionaryTempDecoder(attrsOnExpand,
- new util.HashSet[AttributeReferenceWrapper](),
- expand.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
- isOuter = true)
- } else {
- CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
- }
- case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnConds = new util.HashSet[AttributeReferenceWrapper]
- // In case the child is join then we cannot push down the filters so decode them earlier
- if (filter.child.isInstanceOf[Join] || filter.child.isInstanceOf[Sort]) {
- filter.condition.collect {
- case attr: AttributeReference =>
- attrsOnConds.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- } else {
- CarbonFilters
- .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
- }
-
- var child = filter.child
- if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
- child = CarbonDictionaryTempDecoder(attrsOnConds,
- new util.HashSet[AttributeReferenceWrapper](),
- filter.child)
- }
-
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Filter(filter.condition, child),
- isOuter = true)
- } else {
- Filter(filter.condition, child)
- }
-
- case j: Join
- if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
- j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
- val attrsOnJoin = new util.HashSet[Attribute]
- j.condition match {
- case Some(expression) =>
- expression.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
- }
- case _ =>
- }
-
- val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- if (attrsOnJoin.size() > 0) {
-
- attrsOnJoin.asScala.map { attr =>
- if (qualifierPresence(j.left, attr)) {
- leftCondAttrs.add(AttributeReferenceWrapper(attr))
- }
- if (qualifierPresence(j.right, attr)) {
- rightCondAttrs.add(AttributeReferenceWrapper(attr))
- }
- }
- var leftPlan = j.left
- var rightPlan = j.right
- if (leftCondAttrs.size() > 0 &&
- !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- leftPlan = leftPlan match {
- case agg: Aggregate =>
- CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- transformAggregateExpression(agg, leftCondAttrs))
- case _ =>
- CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- j.left)
- }
- }
- if (rightCondAttrs.size() > 0 &&
- !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- rightPlan = rightPlan match {
- case agg: Aggregate =>
- CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- transformAggregateExpression(agg, rightCondAttrs))
- case _ =>
- CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- j.right)
- }
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Join(leftPlan, rightPlan, j.joinType, j.condition),
- isOuter = true)
- } else {
- Join(leftPlan, rightPlan, j.joinType, j.condition)
- }
- } else {
- j
- }
-
- case p: Project
- if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
- p.projectList.map {
- case attr: AttributeReference =>
- case a@Alias(attr: AttributeReference, name) =>
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- var child = p.child
- if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
- child = CarbonDictionaryTempDecoder(attrsOnProjects,
- new util.HashSet[AttributeReferenceWrapper](),
- p.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Project(p.projectList, child),
- isOuter = true)
- } else {
- Project(p.projectList, child)
- }
-
- case wd: Window if !wd.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
- wd.projectList.map {
- case attr: AttributeReference =>
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- wd.windowExpressions.map { others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- wd.partitionSpec.map{
- case attr: AttributeReference =>
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- wd.orderSpec.map { s =>
- s.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- wd.partitionSpec.map { s =>
- s.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- var child = wd.child
- if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
- child = CarbonDictionaryTempDecoder(attrsOnProjects,
- new util.HashSet[AttributeReferenceWrapper](),
- wd.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](),
- Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child),
- isOuter = true)
- } else {
- Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child)
- }
-
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
- new util.HashSet[AttributeReferenceWrapper](), l, isOuter = true)
- } else {
- l
- }
-
- case others => others
- }
-
- }
-
- val transFormedPlan =
- plan transformDown {
- case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
- decoder = true
- cd
- case currentPlan =>
- if (hasCarbonRelation(currentPlan)) {
- addTempDecoder(currentPlan)
- } else {
- currentPlan
- }
- }
-
- val processor = new CarbonDecoderProcessor
- processor.updateDecoders(processor.getDecoderList(transFormedPlan))
- updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap))
- }
-
- private def updateTempDecoder(plan: LogicalPlan,
- aliasMapOriginal: CarbonAliasDecoderRelation,
- attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]):
- LogicalPlan = {
- var allAttrsNotDecode: util.Set[AttributeReferenceWrapper] =
- new util.HashSet[AttributeReferenceWrapper]()
- val marker = new CarbonPlanMarker
- var aliasMap = aliasMapOriginal
- plan transformDown {
- case cd: CarbonDictionaryTempDecoder if !cd.processed =>
- cd.processed = true
- allAttrsNotDecode = cd.attrsNotDecode
- aliasMap = cd.aliasMap.getOrElse(aliasMap)
- marker.pushMarker(allAttrsNotDecode)
- if (cd.isOuter) {
- CarbonDictionaryCatalystDecoder(relations,
- ExcludeProfile(cd.getAttrsNotDecode.asScala.toSeq),
- aliasMap,
- isOuter = true,
- cd.child)
- } else {
- CarbonDictionaryCatalystDecoder(relations,
- IncludeProfile(cd.getAttrList.asScala.toSeq),
- aliasMap,
- isOuter = false,
- cd.child)
- }
- case cd: CarbonDictionaryCatalystDecoder =>
- cd
- case sort: Sort =>
- val sortExprs = sort.order.map { s =>
- s.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }.asInstanceOf[SortOrder]
- }
- Sort(sortExprs, sort.global, sort.child)
- case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
- val aggExps = agg.aggregateExpressions.map { aggExp =>
- aggExp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }.asInstanceOf[Seq[NamedExpression]]
-
- val grpExps = agg.groupingExpressions.map { gexp =>
- gexp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }
- Aggregate(grpExps, aggExps, agg.child)
- case expand: Expand =>
- expand.transformExpressions {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- case filter: Filter =>
- val filterExps = filter.condition transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- Filter(filterExps, filter.child)
- case j: Join =>
- marker.pushBinaryMarker(allAttrsNotDecode)
- j
- case u: Union =>
- marker.pushBinaryMarker(allAttrsNotDecode)
- u
- case p: Project if relations.nonEmpty =>
- val prExps = p.projectList.map { prExp =>
- prExp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- Project(prExps, p.child)
- case wd: Window if relations.nonEmpty =>
- val prExps = wd.projectList.map { prExp =>
- prExp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }.asInstanceOf[Seq[Attribute]]
- val wdExps = wd.windowExpressions.map { gexp =>
- gexp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- val partitionSpec = wd.partitionSpec.map{ exp =>
- exp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }
- val orderSpec = wd.orderSpec.map { exp =>
- exp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- }.asInstanceOf[Seq[SortOrder]]
- Window(prExps, wdExps, partitionSpec, orderSpec, wd.child)
-
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- allAttrsNotDecode = marker.revokeJoin()
- l
- case others => others
- }
- }
-
- private def updateProjection(plan: LogicalPlan): LogicalPlan = {
- val transFormedPlan = plan transform {
- case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) =>
- if (cd.child.isInstanceOf[Filter] || cd.child.isInstanceOf[LogicalRelation]) {
- Project(projectList: Seq[NamedExpression], cd.child)
- } else {
- p
- }
- case f@Filter(condition: Expression, cd: CarbonDictionaryCatalystDecoder) =>
- if (cd.child.isInstanceOf[Project] || cd.child.isInstanceOf[LogicalRelation]) {
- Filter(condition, cd.child)
- } else {
- f
- }
- }
- // Remove unnecessary decoders
- val finalPlan = transFormedPlan transform {
- case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
- if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
- child
- }
- val updateDtrFn = finalPlan transform {
- case p@Project(projectList: Seq[NamedExpression], cd) =>
- if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
- p.transformAllExpressions {
- case a@Alias(exp, _)
- if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
- a.explicitMetadata)
- case exp: NamedExpression
- if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- CustomDeterministicExpression(exp)
- }
- } else {
- p
- }
- case f@Filter(condition: Expression, cd) =>
- if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
- f.transformAllExpressions {
- case a@Alias(exp, _)
- if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
- a.explicitMetadata)
- case exp: NamedExpression
- if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
- CustomDeterministicExpression(exp)
- }
- } else {
- f
- }
- }
-
- updateDtrFn
- }
-
- private def collectInformationOnAttributes(plan: LogicalPlan,
- aliasMap: CarbonAliasDecoderRelation) {
- plan transformAllExpressions {
- case a@Alias(exp, name) =>
- exp match {
- case attr: Attribute => aliasMap.put(a.toAttribute, attr)
- case _ => aliasMap.put(a.toAttribute, AttributeReference("", StringType)())
- }
- a
- }
- // collect the output of expand and add projections attributes as alias to it.
- plan.collect {
- case expand: Expand =>
- expand.projections.foreach {s =>
- s.zipWithIndex.foreach { f =>
- f._1 match {
- case attr: AttributeReference =>
- aliasMap.put(expand.output(f._2).toAttribute, attr)
- case a@Alias(attr: AttributeReference, name) =>
- aliasMap.put(expand.output(f._2).toAttribute, attr)
- case others =>
- }
- }
- }
- }
- }
-
- // Collect aggregates on dimensions so that we can add decoder to it.
- private def collectDimensionAggregates(aggExp: AggregateExpression,
- attrsOndimAggs: util.HashSet[AttributeReferenceWrapper],
- aliasMap: CarbonAliasDecoderRelation,
- attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]) {
- aggExp collect {
- case attr: AttributeReference if isDictionaryEncoded(attr, attrMap, aliasMap) =>
- attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
-
- /**
- * Update the attribute datatype with [IntegerType] if the carbon column is encoded with
- * dictionary.
- *
- */
- private def updateDataType(attr: Attribute,
- attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
- allAttrsNotDecode: java.util.Set[AttributeReferenceWrapper],
- aliasMap: CarbonAliasDecoderRelation): Attribute = {
- val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
- if (relation.isDefined) {
- relation.get.dictionaryMap.get(uAttr.name) match {
- case Some(true)
- if !allAttrsNotDecode.contains(AttributeReferenceWrapper(uAttr)) =>
- val newAttr = AttributeReference(attr.name,
- IntegerType,
- attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifiers)
- newAttr
- case _ => attr
- }
- } else {
- attr
- }
- }
-
- private def isDictionaryEncoded(attr: Attribute,
- attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation],
- aliasMap: CarbonAliasDecoderRelation): Boolean = {
- val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr)))
- if (relation.isDefined) {
- relation.get.dictionaryMap.get(uAttr.name) match {
- case Some(true) => true
- case _ => false
- }
- } else {
- false
- }
- }
-
- def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
- var present = false
- plan collect {
- case l: LogicalRelation if l.attributeMap.contains(attr) =>
- present = true
- }
- present
- }
-}
-
-case class CarbonDecoderRelation(
- attributeMap: AttributeMap[AttributeReference],
- carbonRelation: CarbonDatasourceRelation) {
-
- val extraAttrs = new ArrayBuffer[Attribute]()
-
- def addAttribute(attr: Attribute): Unit = {
- extraAttrs += attr
- }
-
- def contains(attr: Attribute): Boolean = {
- val exists =
- attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
- entry._1.exprId.equals(attr.exprId)) ||
- extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
- entry.exprId.equals(attr.exprId))
- exists
- }
-
- def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper,
- CarbonDecoderRelation]): Unit = {
- attributeMap.foreach { attr =>
- attrMap.put(AttributeReferenceWrapper(attr._1), this)
- }
- }
-
- lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
-}
-
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
deleted file mode 100644
index bb00126..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.test
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{CarbonContext, DataFrame, SQLContext}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * This class is a sql executor of unit test case for spark version 1.x.
- */
-
-class SparkTestQueryExecutor extends TestQueryExecutorRegister {
- override def sql(sqlText: String): DataFrame = SparkTestQueryExecutor.cc.sql(sqlText)
-
- override def sqlContext: SQLContext = SparkTestQueryExecutor.cc
-
- override def stop(): Unit = SparkTestQueryExecutor.cc.sparkContext.stop()
-}
-
-object SparkTestQueryExecutor {
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOGGER.info("use TestQueryExecutorImplV1")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir"))
- .addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL)
- .addProperty(CarbonCommonConstants.STORE_LOCATION, TestQueryExecutor.storeLocation)
-
- val sc = new SparkContext(new SparkConf()
- .setAppName("CarbonSpark")
- .setMaster("local[2]")
- .set("spark.sql.shuffle.partitions", "20"))
- sc.setLogLevel("ERROR")
-
- val cc = new CarbonContext(sc, TestQueryExecutor.storeLocation, TestQueryExecutor.metastoredb)
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
deleted file mode 100644
index e73f78c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.util
-
-import org.apache.spark.TaskContext
-
-
-object TaskContextUtil {
- def setTaskContext(context: TaskContext): Unit = {
- val localThreadContext = TaskContext.get()
- if (localThreadContext == null) {
- TaskContext.setTaskContext(context)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
deleted file mode 100644
index d09c9b5..0000000
--- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ /dev/null
@@ -1,17 +0,0 @@
-## ------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ------------------------------------------------------------------------
-org.apache.spark.sql.CarbonSource
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
----------------------------------------------------------------------
diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
deleted file mode 100644
index fc96db4..0000000
--- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister
+++ /dev/null
@@ -1,17 +0,0 @@
-## ------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ------------------------------------------------------------------------
-org.apache.spark.sql.test.SparkTestQueryExecutor
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/resources/badrecords/test2.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/test2.csv b/integration/spark/src/test/resources/badrecords/test2.csv
deleted file mode 100644
index 51d25b2..0000000
--- a/integration/spark/src/test/resources/badrecords/test2.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-0,569,silo
-1,843658743265874365874365874365584376547598375987,hello
-2,87436587349436587568784658743065874376,priyal
-3,56985,simple
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
deleted file mode 100644
index aaaf66b..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.complexType
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * Test class of creating and loading for carbon table with double
- *
- */
-class TestComplexPrimitiveTimestampDirectDictionary extends QueryTest with BeforeAndAfterAll {
-
- override def beforeAll: Unit = {
- sql("drop table if exists complexcarbontimestamptable")
- sql("drop table if exists complexhivetimestamptable")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS")
- sql("CREATE TABLE complexcarbontimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double) STORED BY 'org.apache.carbondata.format'")
- sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexcarbontimestamptable OPTIONS" +
- "('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='empno,workdate,punchinout,worktime,salary')");
- sql("CREATE TABLE complexhivetimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double)row format delimited fields terminated by ',' collection items terminated by '$'")
- sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexhivetimestamptable")
- }
-
- test("select * query") {
- checkAnswer(sql("select * from complexcarbontimestamptable"),
- sql("select * from complexhivetimestamptable"))
- }
-
- test("timestamp complex type in the middle of complex types") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS")
- sql("CREATE TABLE testtimestampcarbon(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)STORED BY 'org.apache.carbondata.format'")
- sql("LOAD DATA local inpath '" + resourcesPath + "/timestampdata.csv' INTO TABLE testtimestampcarbon options('DELIMITER'=',', 'QUOTECHAR'='\"','COMPLEX_DELIMITER_LEVEL_1'='$', 'FILEHEADER'='imei,rat,sid,end_time,probeid,contact')")
- sql("CREATE TABLE testtimestamphive(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)row format delimited fields terminated by ',' collection items terminated by '$'")
- sql(s"LOAD DATA local inpath '$resourcesPath/timestampdata.csv' INTO TABLE testtimestamphive")
- checkAnswer(sql("select * from testtimestampcarbon"), sql("select * from testtimestamphive"))
- sql("drop table if exists testtimestampcarbon")
- sql("drop table if exists testtimestamphive")
- }
-
- override def afterAll {
- sql("drop table if exists complexcarbontimestamptable")
- sql("drop table if exists complexhivetimestamptable")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
deleted file mode 100644
index 98e4f18..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.dataload
-
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
-
- var df: DataFrame = _
-
- override def beforeAll {
- sql("DROP TABLE IF EXISTS carbon1")
-
- import sqlContext.implicits._
- df = sqlContext.sparkContext.parallelize(1 to 1000)
- .map(x => ("a", "b", x))
- .toDF("c1", "c2", "c3")
-
- // save dataframe to carbon file
- df.write
- .format("carbondata")
- .option("tableName", "carbon1")
- .mode(SaveMode.Overwrite)
- .save()
- }
-
- test("read and write using CarbonContext") {
- val in = sqlContext.read
- .format("carbondata")
- .option("tableName", "carbon1")
- .load()
-
- assert(in.where("c3 > 500").count() == 500)
- }
-
- test("read and write using CarbonContext with compression") {
- val in = sqlContext.read
- .format("carbondata")
- .option("tableName", "carbon1")
- .option("compress", "true")
- .load()
-
- assert(in.where("c3 > 500").count() == 500)
- }
-
- test("test overwrite") {
- sql("DROP TABLE IF EXISTS carbon4")
- df.write
- .format("carbondata")
- .option("tableName", "carbon4")
- .mode(SaveMode.Overwrite)
- .save()
- df.write
- .format("carbondata")
- .option("tableName", "carbon4")
- .mode(SaveMode.Overwrite)
- .save()
- val in = sqlContext.read
- .format("carbondata")
- .option("tableName", "carbon4")
- .load()
- assert(in.where("c3 > 500").count() == 500)
- sql("DROP TABLE IF EXISTS carbon4")
- }
-
- test("read and write using CarbonContext, multiple load") {
- sql("DROP TABLE IF EXISTS carbon4")
- df.write
- .format("carbondata")
- .option("tableName", "carbon4")
- .mode(SaveMode.Overwrite)
- .save()
- df.write
- .format("carbondata")
- .option("tableName", "carbon4")
- .mode(SaveMode.Append)
- .save()
- val in = sqlContext.read
- .format("carbondata")
- .option("tableName", "carbon4")
- .load()
- assert(in.where("c3 > 500").count() == 1000)
- sql("DROP TABLE IF EXISTS carbon4")
- }
-
- test("query using SQLContext") {
- val newSQLContext = new SQLContext(sqlContext.sparkContext)
- newSQLContext.sql(
- s"""
- | CREATE TEMPORARY TABLE temp
- | (c1 string, c2 string, c3 int)
- | USING org.apache.spark.sql.CarbonSource
- | OPTIONS (path '$storeLocation/default/carbon1')
- """.stripMargin)
- checkAnswer(newSQLContext.sql(
- """
- | SELECT c1, c2, count(*)
- | FROM temp
- | WHERE c3 > 100
- | GROUP BY c1, c2
- """.stripMargin), Seq(Row("a", "b", 900)))
- newSQLContext.dropTempTable("temp")
- }
-
- test("query using SQLContext without providing schema") {
- val newSQLContext = new SQLContext(sqlContext.sparkContext)
- newSQLContext.sql(
- s"""
- | CREATE TEMPORARY TABLE temp
- | USING org.apache.spark.sql.CarbonSource
- | OPTIONS (path '$storeLocation/default/carbon1')
- """.stripMargin)
- checkAnswer(newSQLContext.sql(
- """
- | SELECT c1, c2, count(*)
- | FROM temp
- | WHERE c3 > 100
- | GROUP BY c1, c2
- """.stripMargin), Seq(Row("a", "b", 900)))
- newSQLContext.dropTempTable("temp")
- }
-
- test("query using SQLContext, multiple load") {
- sql("DROP TABLE IF EXISTS test")
- sql(
- """
- | CREATE TABLE test(id int, name string, city string, age int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
- val testData = s"${resourcesPath}/sample.csv"
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
-
- val newSQLContext = new SQLContext(sqlContext.sparkContext)
- newSQLContext.sql(
- s"""
- | CREATE TEMPORARY TABLE temp
- | (id int, name string, city string, age int)
- | USING org.apache.spark.sql.CarbonSource
- | OPTIONS (path '$storeLocation/default/test')
- """.stripMargin)
- checkAnswer(newSQLContext.sql(
- """
- | SELECT count(id)
- | FROM temp
- """.stripMargin), Seq(Row(8)))
- newSQLContext.dropTempTable("temp")
- sql("DROP TABLE test")
- }
-
- test("json data with long datatype issue CARBONDATA-405") {
- val jsonDF = sqlContext.read.format("json").load(s"$resourcesPath/test.json")
- jsonDF.write
- .format("carbondata")
- .option("tableName", "dftesttable")
- .option("compress", "true")
- .mode(SaveMode.Overwrite)
- .save()
- val carbonDF = sqlContext
- .read
- .format("carbondata")
- .option("tableName", "dftesttable")
- .load()
- checkAnswer(
- carbonDF.select("age", "name"),
- jsonDF.select("age", "name"))
- sql("drop table dftesttable")
- }
-
- override def afterAll {
- sql("DROP TABLE IF EXISTS carbon1")
- sql("DROP TABLE IF EXISTS carbon2")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
deleted file mode 100644
index b61ecce..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.integration.spark.testsuite.dataload
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-/**
- * Test Class for data loading when there is single quote in fact data
- *
- */
-class TestLoadDataWithSingleQuotechar extends QueryTest with BeforeAndAfterAll {
- override def beforeAll {
- sql("DROP TABLE IF EXISTS carbontable")
- sql(
- "CREATE TABLE carbontable (id Int, name String) STORED BY 'carbondata'")
- }
-
- test("test data loading with single quote char") {
- try {
- sql(
- s"LOAD DATA LOCAL INPATH '$resourcesPath/dataWithSingleQuote.csv' INTO TABLE " +
- "carbontable OPTIONS('DELIMITER'= ',')")
- sql("SELECT * from carbontable")
- checkAnswer(
- sql("SELECT * from carbontable"),
- Seq(Row(1,"Tom"),
- Row(2,"Tony\n3,Lily"),
- Row(4,"Games\""),
- Row(5,"prival\"\n6,\"hello\"")
- )
- )
- } catch {
- case e: Throwable =>
- assert(false)
- }
- }
-
- override def afterAll {
- sql("DROP TABLE carbontable")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
deleted file mode 100644
index 1c003c7..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.allqueries
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{Row, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * Test Class for all query on multiple datatypes
- *
- */
-class AllQueriesSpark1TestCase extends QueryTest with BeforeAndAfterAll {
-
- override def beforeAll {
- clean
-
- sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- sql("LOAD DATA LOCAL INPATH '" + resourcesPath + "/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Lat
est_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
- }
-
- def clean {
- sql("drop table if exists Carbon_automation_test")
- }
-
- override def afterAll {
- clean
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
- }
-
-
- //TC_113
- test("select percentile_approx(deviceInformationId,0.2) as a from Carbon_automation_test")({
- checkAnswer(
- sql("select percentile_approx(deviceInformationId,0.2) as a from Carbon_automation_test"),
- Seq(Row(100005.8)))
- })
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
deleted file mode 100644
index d762ec6..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.allqueries
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-class InsertIntoCarbonTableSpark1TestCase extends QueryTest with BeforeAndAfterAll {
- override def beforeAll {
- sql("drop table if exists THive")
- sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions st
ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
- sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
- }
-
-
- test("insert from carbon-select columns-source table has more column then target column") {
- val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-
- sql("drop table if exists load")
- sql("drop table if exists inser")
- sql("CREATE TABLE load(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,name string,point int)STORED BY 'org.apache.carbondata.format'")
- sql("LOAD DATA INPATH '" + resourcesPath + "/shortolap.csv' INTO TABLE load options ('DELIMITER'=',', 'QUOTECHAR'='\"','FILEHEADER' = 'imei,age,task,num,level,productdate,name,point')")
- sql("CREATE TABLE inser(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp)STORED BY 'org.apache.carbondata.format'")
- sql("insert into inser select * from load")
- checkAnswer(
- sql("select * from inser"),
- sql("select imei,age,task,num,level,productdate from load")
- )
- sql("drop table if exists load")
- sql("drop table if exists inser")
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
- }
-
- test("insert->hive column more than carbon column->success") {
- sql("drop table if exists TCarbon")
- sql("create table TCarbon (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
-
- sql("insert into TCarbon select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked from THive")
- checkAnswer(
- sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from THive"),
- sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from TCarbon")
- )
- }
-
-// test("insert->insert empty data -pass") {
-// sql("drop table if exists TCarbon")
-// sql("create table TCarbon (imei string,deviceInformationId int,MAC string) STORED BY 'org.apache.carbondata.format'")
-// sql("insert into TCarbon select imei,deviceInformationId,MAC from THive where MAC='wrongdata'")
-// val result = sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'").collect()
-// checkAnswer(
-// sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"),
-// sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'")
-// )
-// }
-
- override def afterAll {
- sql("drop table if exists load")
- sql("drop table if exists inser")
- sql("DROP TABLE IF EXISTS THive")
- sql("DROP TABLE IF EXISTS TCarbon")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
deleted file mode 100644
index 7aee00d..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.badrecordloger
-
-import java.io.File
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.hive.HiveContext
-import org.scalatest.BeforeAndAfterAll
-
-
-/**
- * Test Class for detailed query on timestamp datatypes
- *
- *
- */
-class BadRecordLoggerSharedDictionaryTest extends QueryTest with BeforeAndAfterAll {
- var hiveContext: HiveContext = _
- var csvFilePath : String = null
- var timestamp_format: String = null
-
- override def beforeAll {
- sql("drop table IF EXISTS testdrive")
- sql(
- """create table testdrive (ID int,CUST_ID int,cust_name string)
- STORED BY 'org.apache.carbondata.format'
- TBLPROPERTIES("columnproperties.cust_name.shared_column"="shared.cust_name",
- "columnproperties.ID.shared_column"="shared.ID",
- "columnproperties.CUST_ID.shared_column"="shared.CUST_ID",
- 'DICTIONARY_INCLUDE'='ID,CUST_ID')"""
- )
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- new File("./target/test/badRecords")
- .getCanonicalPath
- )
-
- val carbonProp = CarbonProperties.getInstance()
- timestamp_format = carbonProp.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- carbonProp.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/mm/dd")
- val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
- .getCanonicalPath
- csvFilePath = currentDirectory + "/src/test/resources/badrecords/test2.csv"
-
- }
- test("dataload with bad record test") {
- try {
- sql(
- s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
- |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
- |'FILEHEADER'= 'ID,CUST_ID,cust_name')""".stripMargin)
- } catch {
- case e: Throwable =>
- assert(e.getMessage.contains("Data load failed due to bad record"))
- }
- }
-
- override def afterAll {
- sql("drop table IF EXISTS testdrive")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestamp_format)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
deleted file mode 100644
index 7400839..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createtable
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-/**
- * Test Class for validating create table syntax for carbontable
- *
- */
-class TestCreateTableSyntax extends QueryTest with BeforeAndAfterAll {
-
- override def beforeAll {
- }
-
- test("Struct field with underscore and struct<struct> syntax check") {
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, username struct<sur_name:string," +
- "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" +
- "STORED BY 'org.apache.carbondata.format'")
- sql("describe carbontable")
- }
-
- test("Test table rename operation on carbon table and on hive table") {
- sql("drop table if exists hivetable")
- sql("drop table if exists carbontable")
- sql("drop table if exists hiveRenamedTable")
- sql("drop table if exists carbonRenamedTable")
- sql("create table hivetable(test1 int, test2 array<String>,test3 array<bigint>,"+
- "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+
- "row format delimited fields terminated by ',' collection items terminated by '$' map keys terminated by ':'")
- sql("alter table hivetable rename To hiveRenamedTable")
- sql("create table carbontable(test1 int, test2 array<String>,test3 array<bigint>,"+
- "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+
- "STORED BY 'org.apache.carbondata.format'")
- sql("alter table carbontable compact 'minor'")
- try {
- sql("alter table carbontable rename To carbonRenamedTable")
- assert(false)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("Unsupported alter operation on carbon table"))
- }
- }
- }
-
-
- test("test carbon table create with complex datatype as dictionary exclude") {
- try {
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
- "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_EXCLUDE'='dept,mobile')")
- assert(false)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for complex datatype column: mobile"))
- }
- }
- }
-
- test("test carbon table create with double datatype as dictionary exclude") {
- try {
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
- "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')")
- assert(false)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for double " +
- "data type column: salary"))
- }
- }
- }
- test("test carbon table create with int datatype as dictionary exclude") {
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, name string, dept string, mobile array<string>, " +
- "country string, salary double) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_EXCLUDE'='id')")
- assert(true)
- }
-
- test("test carbon table create with decimal datatype as dictionary exclude") {
- try {
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+
- "country string, salary decimal) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')")
- assert(false)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for decimal " +
- "data type column: salary"))
- }
- }
- }
-
- test("describe formatted on hive table and carbon table") {
- sql("drop table if exists hivetable")
- sql("drop table if exists carbontable")
- sql("create table carbontable(id int, username struct<sur_name:string," +
- "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" +
- "STORED BY 'org.apache.carbondata.format'")
- sql("describe formatted carbontable")
- sql("create table hivetable(id int, username struct<sur_name:string," +
- "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)")
- sql("describe formatted hivetable")
- }
-
- test("describe command carbon table for decimal scale and precision test") {
- sql("drop table if exists carbontablePrecision")
- sql("create table carbontablePrecision(id int, name string, dept string, mobile array<string>, "+
- "country string, salary decimal(10,6)) STORED BY 'org.apache.carbondata.format' " +
- "TBLPROPERTIES('DICTIONARY_INCLUDE'='salary,id')")
- checkAnswer(
- sql("describe carbontablePrecision"),
- Seq(Row("country","string",""),
- Row("dept","string",""),Row("id","int",""),Row("mobile","array<string>",""),Row("name","string",""),
- Row("salary","decimal(10,6)","")
- )
- )
- }
-
- test("create carbon table without dimensions") {
- try {
- sql("drop table if exists carbontable")
- sql("create table carbontable(msr1 int, msr2 double, msr3 bigint, msr4 decimal)" +
- " stored by 'org.apache.carbondata.format'")
- assert(true)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("Table default.carbontable can not be created without " +
- "key columns. Please use DICTIONARY_INCLUDE or DICTIONARY_EXCLUDE to " +
- "set at least one key column if all specified columns are numeric types"))
- }
- }
- }
-
- test("create carbon table with repeated table properties") {
- try {
- sql("drop table if exists carbontable")
- sql(
- """
- CREATE TABLE IF NOT EXISTS carbontable
- (ID Int, date Timestamp, country String,
- name String, phonetype String, serialname String, salary Int)
- STORED BY 'carbondata'
- TBLPROPERTIES('DICTIONARY_EXCLUDE'='country','DICTIONARY_INCLUDE'='ID',
- 'DICTIONARY_EXCLUDE'='phonetype', 'DICTIONARY_INCLUDE'='salary')
- """)
- assert(false)
- } catch {
- case e : MalformedCarbonCommandException => {
- assert(e.getMessage.equals("Table properties is repeated: dictionary_include,dictionary_exclude"))
- }
- }
- }
-
- override def afterAll {
- sql("drop table if exists hivetable")
- sql("drop table if exists carbontable")
- sql("drop table if exists hiveRenamedTable")
- sql("drop table if exists carbonRenamedTable")
- sql("drop table if exists carbontablePrecision")
- }
-}
\ No newline at end of file