You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:10 UTC
[20/35] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
deleted file mode 100644
index 2e93a6c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.language.implicitConversions
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.{CarbonOption, _}
-
-/**
- * Carbon relation provider compliant to data source api.
- * Creates carbon relations
- */
-class CarbonSource extends RelationProvider
- with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister {
-
- override def shortName(): String = "carbondata"
-
- /**
- * Returns a new base relation with the given parameters.
- * Note: the parameters' keywords are case insensitive and this insensitivity is enforced
- * by the Map that is passed to the function.
- */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String]): BaseRelation = {
- // if path is provided we can directly create Hadoop relation. \
- // Otherwise create datasource relation
- parameters.get("path") match {
- case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None)
- case _ =>
- val options = new CarbonOption(parameters)
- val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
- val identifier = tableIdentifier match {
- case Seq(name) => TableIdentifier(name, None)
- case Seq(db, name) => TableIdentifier(name, Some(db))
- }
- CarbonDatasourceRelation(identifier, None)(sqlContext)
- }
- }
-
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: SchemaRDD): BaseRelation = {
-
- // To avoid derby problem, dataframe need to be writen and read using CarbonContext
- require(sqlContext.isInstanceOf[CarbonContext], "Error in saving dataframe to carbon file, " +
- "must use CarbonContext to save dataframe")
-
- // User should not specify path since only one store is supported in carbon currently,
- // after we support multi-store, we can remove this limitation
- require(!parameters.contains("path"), "'path' should not be specified, " +
- "the path to store carbon file is the 'storePath' specified when creating CarbonContext")
-
- val options = new CarbonOption(parameters)
- val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath
- val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
- val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- .exists(tablePath)
- val (doSave, doAppend) = (mode, isExists) match {
- case (SaveMode.ErrorIfExists, true) =>
- sys.error(s"ErrorIfExists mode, path $storePath already exists.")
- case (SaveMode.Overwrite, true) =>
- val cc = CarbonContext.getInstance(sqlContext.sparkContext)
- cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
- (true, false)
- case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
- (true, false)
- case (SaveMode.Append, _) =>
- (false, true)
- case (SaveMode.Ignore, exists) =>
- (!exists, false)
- }
-
- if (doSave) {
- // save data when the save mode is Overwrite.
- new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters)
- } else if (doAppend) {
- new CarbonDataFrameWriter(data).appendToCarbonFile(parameters)
- }
-
- createRelation(sqlContext, parameters)
- }
-
- override def createRelation(sqlContext: SQLContext,
- paths: Array[String],
- dataSchema: Option[StructType],
- partitionColumns: Option[StructType],
- parameters: Map[String, String]): HadoopFsRelation = {
- CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema)
- }
-}
-
-/**
- * Creates carbon relation compliant to data source api.
- * This relation is stored to hive metastore
- */
-private[sql] case class CarbonDatasourceRelation(
- tableIdentifier: TableIdentifier,
- alias: Option[String])
- (@transient context: SQLContext)
- extends BaseRelation with Serializable {
-
- lazy val carbonRelation: CarbonRelation = {
- CarbonEnv.get
- .carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext)
- .asInstanceOf[CarbonRelation]
- }
-
- def getDatabaseName(): String = tableIdentifier.database.getOrElse("default")
-
- def getTable(): String = tableIdentifier.table
-
- def schema: StructType = carbonRelation.schema
-
- def sqlContext: SQLContext = context
-
- override def sizeInBytes: Long = carbonRelation.sizeInBytes
-}
-
-/**
- * Represents logical plan for one carbon table
- */
-case class CarbonRelation(
- databaseName: String,
- tableName: String,
- var metaData: CarbonMetaData,
- tableMeta: TableMeta,
- alias: Option[String])(@transient sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation {
-
- def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
- childDim.getDataType.getName.toLowerCase match {
- case "array" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(childDim.getColName) }>"
- case "struct" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:struct<${ getStructChildren(childDim.getColName) }>"
- case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
- }
- }
-
- def getArrayChildren(dimName: String): String = {
- metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
- childDim.getDataType.getName.toLowerCase match {
- case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
- case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
- case dType => addDecimalScaleAndPrecision(childDim, dType)
- }
- }).mkString(",")
- }
-
- def getStructChildren(dimName: String): String = {
- metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
- childDim.getDataType.getName.toLowerCase match {
- case "array" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(childDim.getColName) }>"
- case "struct" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
- .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
- }>"
- case dType => s"${ childDim.getColName
- .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
- }
- }).mkString(",")
- }
-
- override def newInstance(): LogicalPlan = {
- CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext)
- .asInstanceOf[this.type]
- }
-
- val dimensionsAttr = {
- val sett = new java.util.LinkedHashSet(tableMeta.carbonTable
- .getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName).asScala.asJava)
- sett.asScala.toSeq.filter(dim => !dim.isInvisible ||
- (dim.isInvisible && dim.isInstanceOf[CarbonImplicitDimension]))
- .map(dim => {
- val dimval = metaData.carbonTable
- .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
- val output: DataType = dimval.getDataType.getName.toLowerCase match {
- case "array" =>
- CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
- case "struct" =>
- CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
- case dType =>
- val dataType = addDecimalScaleAndPrecision(dimval, dType)
- CarbonMetastoreTypes.toDataType(dataType)
- }
-
- AttributeReference(
- dim.getColName,
- output,
- nullable = true)(qualifiers = tableName +: alias.toSeq)
- })
- }
-
- val measureAttr = {
- val factTable = tableMeta.carbonTable.getFactTableName
- new java.util.LinkedHashSet(
- tableMeta.carbonTable.
- getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
- asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
- .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
- metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.getName
- .toLowerCase match {
- case "float" => "double"
- case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
- case others => others
- }),
- nullable = true)(qualifiers = tableName +: alias.toSeq))
- }
-
- override val output = {
- val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
- .asScala
- columns.filter(!_.isInvisible).map { column =>
- if (column.isDimension()) {
- val output: DataType = column.getDataType.getName.toLowerCase match {
- case "array" =>
- CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
- case "struct" =>
- CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
- case dType =>
- val dataType = addDecimalScaleAndPrecision(column, dType)
- CarbonMetastoreTypes.toDataType(dataType)
- }
- AttributeReference(column.getColName, output,
- nullable = true
- )(qualifiers = tableName +: alias.toSeq)
- } else {
- AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType(
- column.getDataType.getName.toLowerCase match {
- case "float" => "double"
- case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
- .getColumnSchema.getScale + ")"
- case others => others
- }
- ),
- nullable = true
- )(qualifiers = tableName +: alias.toSeq)
- }
- }
- }
- // TODO: Use data from the footers.
- override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
-
- override def equals(other: Any): Boolean = {
- other match {
- case p: CarbonRelation =>
- p.databaseName == databaseName && p.output == output && p.tableName == tableName
- case _ => false
- }
- }
-
- def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
- var dType = dataType
- if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) {
- dType +=
- "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
- }
- dType
- }
-
- private var tableStatusLastUpdateTime = 0L
-
- private var sizeInBytesLocalValue = 0L
-
- def sizeInBytes: Long = {
- val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
- tableMeta.carbonTable.getAbsoluteTableIdentifier)
- if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
- val tablePath = CarbonStorePath.getCarbonTablePath(
- tableMeta.storePath,
- tableMeta.carbonTableIdentifier).getPath
- val fileType = FileFactory.getFileType(tablePath)
- if(FileFactory.isFileExist(tablePath, fileType)) {
- tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
- sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
- }
- }
- sizeInBytesLocalValue
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
deleted file mode 100644
index c14a61a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.TaskContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonMetastoreTypes}
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-
-/**
- * It decodes the dictionary key to value
- */
-case class CarbonDictionaryDecoder(
- relations: Seq[CarbonDecoderRelation],
- profile: CarbonProfile,
- aliasMap: CarbonAliasDecoderRelation,
- child: SparkPlan)
- (@transient sqlContext: SQLContext)
- extends UnaryNode {
-
- override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
-
- override val output: Seq[Attribute] = {
- child.output.map { a =>
- val attr = aliasMap.getOrElse(a, a)
- val relation = relations.find(p => p.contains(attr))
- if (relation.isDefined && canBeDecoded(attr)) {
- val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
- val carbonDimension = carbonTable
- .getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null &&
- carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !carbonDimension.isComplex()) {
- val newAttr = AttributeReference(a.name,
- convertCarbonToSparkDataType(carbonDimension,
- relation.get.carbonRelation.carbonRelation),
- a.nullable,
- a.metadata)(a.exprId,
- a.qualifiers).asInstanceOf[Attribute]
- newAttr
- } else {
- a
- }
- } else {
- a
- }
- }
- }
-
-
- def canBeDecoded(attr: Attribute): Boolean = {
- profile match {
- case ip: IncludeProfile if ip.attributes.nonEmpty =>
- ip.attributes
- .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
- case ep: ExcludeProfile =>
- !ep.attributes
- .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
- case _ => true
- }
- }
-
- def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
- relation: CarbonRelation): types.DataType = {
- carbonDimension.getDataType match {
- case CarbonDataTypes.STRING => StringType
- case CarbonDataTypes.SHORT => ShortType
- case CarbonDataTypes.INT => IntegerType
- case CarbonDataTypes.LONG => LongType
- case CarbonDataTypes.DOUBLE => DoubleType
- case CarbonDataTypes.BOOLEAN => BooleanType
- case CarbonDataTypes.DECIMAL =>
- val scale: Int = carbonDimension.getColumnSchema.getScale
- val precision: Int = carbonDimension.getColumnSchema.getPrecision
- if (scale == 0 && precision == 0) {
- DecimalType(18, 2)
- } else {
- DecimalType(precision, scale)
- }
- case CarbonDataTypes.TIMESTAMP => TimestampType
- case CarbonDataTypes.DATE => DateType
- case CarbonDataTypes.STRUCT =>
- CarbonMetastoreTypes
- .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
- case CarbonDataTypes.ARRAY =>
- CarbonMetastoreTypes
- .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
- }
- }
-
- val getDictionaryColumnIds = {
- val attributes = child.output
- val dictIds: Array[(String, ColumnIdentifier, DataType, CarbonDimension)] =
- attributes.map { a =>
- val attr = aliasMap.getOrElse(a, a)
- val relation = relations.find(p => p.contains(attr))
- if (relation.isDefined && canBeDecoded(attr)) {
- val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null &&
- carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !carbonDimension.isComplex()) {
- (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
- carbonDimension.getDataType, carbonDimension)
- } else {
- (null, null, null, null)
- }
- } else {
- (null, null, null, null)
- }
-
- }.toArray
- dictIds
- }
-
- override def outputsUnsafeRows: Boolean = true
-
- override def canProcessUnsafeRows: Boolean = true
-
- override def canProcessSafeRows: Boolean = true
-
- override def doExecute(): RDD[InternalRow] = {
- attachTree(this, "execute") {
- val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastore].storePath
- val queryId = sqlContext.getConf("queryId", System.nanoTime() + "")
- val absoluteTableIdentifiers = relations.map { relation =>
- val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
- (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
- }.toMap
-
- val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
- if (isRequiredToDecode) {
- val dataTypes = child.output.map { attr => attr.dataType }
- child.execute().mapPartitions { iter =>
- val cacheProvider: CacheProvider = CacheProvider.getInstance
- val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
- val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
- forwardDictionaryCache)
- val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
- // add a task completion listener to clear dictionary that is a decisive factor for
- // LRU eviction policy
- val dictionaryTaskCleaner = TaskContext.get
- dictionaryTaskCleaner.addTaskCompletionListener(context =>
- dicts.foreach { dictionary =>
- if (null != dictionary) {
- dictionary.clear()
- }
- }
- )
- new Iterator[InternalRow] {
- val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-
- override final def hasNext: Boolean = {
- iter.hasNext
- }
-
- override final def next(): InternalRow = {
- val row: InternalRow = iter.next()
- val data = row.toSeq(dataTypes).toArray
- dictIndex.foreach { index =>
- if (data(index) != null) {
- data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
- .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
- getDictionaryColumnIds(index)._4)
- }
- }
- val result = unsafeProjection(new GenericMutableRow(data))
- result
- }
- }
- }
- } else {
- child.execute()
- }
- }
- }
-
- private def isRequiredToDecode = {
- getDictionaryColumnIds.find(p => p._1 != null) match {
- case Some(value) => true
- case _ => false
- }
- }
-
- private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
- cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
- val dictionaryColumnIds = getDictionaryColumnIds.map { dictionaryId =>
- if (dictionaryId._2 != null) {
- new DictionaryColumnUniqueIdentifier(
- atiMap(dictionaryId._1).getCarbonTableIdentifier,
- dictionaryId._2, dictionaryId._3,
- CarbonStorePath.getCarbonTablePath(atiMap(dictionaryId._1)))
- } else {
- null
- }
- }
- try {
- val noDictionaryIndexes = new java.util.ArrayList[Int]()
- dictionaryColumnIds.zipWithIndex.foreach { columnIndex =>
- if (columnIndex._1 == null) {
- noDictionaryIndexes.add(columnIndex._2)
- }
- }
- val dict = cache.getAll(dictionaryColumnIds.filter(_ != null).toSeq.asJava);
- val finalDict = new java.util.ArrayList[Dictionary]()
- var dictIndex: Int = 0
- dictionaryColumnIds.zipWithIndex.foreach { columnIndex =>
- if (!noDictionaryIndexes.contains(columnIndex._2)) {
- finalDict.add(dict.get(dictIndex))
- dictIndex += 1
- } else {
- finalDict.add(null)
- }
- }
- finalDict.asScala
- } catch {
- case t: Throwable => Seq.empty
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
deleted file mode 100644
index 36cd6f2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, CarbonMetastore}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
-import org.apache.carbondata.spark.rdd.SparkReadSupport
-
-case class CarbonEnv(carbonMetastore: CarbonMetastore)
-
-object CarbonEnv {
-
- @volatile private var carbonEnv: CarbonEnv = _
-
- // set readsupport class global so that the executor can get it.
- SparkReadSupport.readSupportClass = classOf[RawDataReadSupport]
-
- var initialized = false
-
- def init(sqlContext: SQLContext): Unit = {
- if (!initialized) {
- val cc = sqlContext.asInstanceOf[CarbonContext]
- val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
- carbonEnv = CarbonEnv(catalog)
- CarbonIUDAnalysisRule.init(sqlContext)
- initialized = true
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
- }
- }
-
- def get: CarbonEnv = {
- if (initialized) carbonEnv
- else throw new RuntimeException("CarbonEnv not initialized")
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
deleted file mode 100644
index 6ed8c0d..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.SQLConf.SQLConfEntry
-import org.apache.spark.sql.hive.CarbonSQLDialect
-
- /**
- * A trait that enables the setting and getting of mutable config parameters/hints.
- *
- */
-class CarbonSQLConf extends SQLConf {
-
- override def dialect: String = {
- getConf(SQLConf.DIALECT,
- classOf[CarbonSQLDialect].getCanonicalName)
- }
-
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
deleted file mode 100644
index a3c6343..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastore
-
-import org.apache.carbondata.core.scan.model._
-import org.apache.carbondata.hadoop.{CarbonProjection, InputMetricsStats}
-import org.apache.carbondata.spark.CarbonFilters
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
- var columnProjection: Seq[Attribute],
- relationRaw: CarbonRelation,
- dimensionPredicatesRaw: Seq[Expression],
- useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
- val carbonTable = relationRaw.metaData.carbonTable
- val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
- val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastore]
-
- val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
- val unprocessedExprs = new ArrayBuffer[Expression]()
-
- val buildCarbonPlan: CarbonQueryPlan = {
- val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
- plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
- processFilterExpressions(plan)
- plan
- }
-
- def processFilterExpressions(plan: CarbonQueryPlan) {
- if (dimensionPredicatesRaw.nonEmpty) {
- val expressionVal = CarbonFilters.processExpression(
- dimensionPredicatesRaw,
- attributesNeedToDecode,
- unprocessedExprs,
- carbonTable)
- expressionVal match {
- case Some(ce) =>
- // adding dimension used in expression in querystats
- plan.setFilterExpression(ce)
- case _ =>
- }
- }
- processExtraAttributes(plan)
- }
-
- private def processExtraAttributes(plan: CarbonQueryPlan) {
- if (attributesNeedToDecode.size() > 0) {
- val attributeOut = new ArrayBuffer[Attribute]() ++ columnProjection
-
- attributesNeedToDecode.asScala.foreach { attr =>
- if (!columnProjection.exists(_.name.equalsIgnoreCase(attr.name))) {
- attributeOut += attr
- }
- }
- columnProjection = attributeOut
- }
-
- val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
- columns.addAll(carbonTable.getImplicitDimensionByTableName(carbonTable.getFactTableName))
- val colAttr = new Array[Attribute](columns.size())
- columnProjection.foreach { attr =>
- val column =
- carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
- if(column != null) {
- colAttr(columns.indexOf(column)) = attr
- }
- }
-
- columnProjection = colAttr.filter(f => f != null)
-
- var queryOrder: Integer = 0
- columnProjection.foreach { attr =>
- val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
- if (carbonColumn != null) {
- if (carbonColumn.isDimension()) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedDims += dim
- } else {
- val m1 = new QueryMeasure(attr.name)
- m1.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedMsrs += m1
- }
- }
- }
-
- // Fill the selected dimensions & measures obtained from
- // attributes to query plan for detailed query
- selectedDims.foreach(plan.addDimension)
- selectedMsrs.foreach(plan.addMeasure)
- }
-
- def inputRdd: CarbonScanRDD = {
- val projection = new CarbonProjection
- columnProjection.foreach { attr =>
- projection.addColumn(attr.name)
- }
- val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
- new CarbonScanRDD(
- ocRaw.sparkContext,
- projection,
- buildCarbonPlan.getFilterExpression,
- carbonTable.getAbsoluteTableIdentifier,
- carbonTable.getTableInfo.serialize(),
- carbonTable.getTableInfo, inputMetricsStats
- )
- }
-
- override def outputsUnsafeRows: Boolean =
- (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
- override def doExecute(): RDD[InternalRow] = {
- val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
- inputRdd.mapPartitions { iter =>
- val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
- new Iterator[InternalRow] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): InternalRow = {
- val value = iter.next
- if (outUnsafeRows) {
- unsafeProjection(value)
- } else {
- value
- }
- }
- }
- }
- }
-
- def output: Seq[Attribute] = {
- columnProjection
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
deleted file mode 100644
index bc62a55..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-
-case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
-object CarbonSparkUtil {
-
- def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
- val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName)
- // wf : may be problem
- val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName)
- val dictionary =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
- (f.getColName.toLowerCase,
- f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !f.getDataType.isComplexType)
- }
- CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
deleted file mode 100644
index 9dc9ee2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.Map
-import scala.language.implicitConversions
-
-import org.apache.hadoop.hive.ql.lib.Node
-import org.apache.hadoop.hive.ql.parse._
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.DescribeCommand
-import org.apache.spark.sql.hive.HiveQlWrapper
-import org.apache.spark.sql.types.StructField
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Parser for All Carbon DDL, DML cases in Unified context
- */
-class CarbonSqlParser() extends CarbonDDLSqlParser {
-
- override def parse(input: String): LogicalPlan = {
- synchronized {
- // Initialize the Keywords.
- initLexical
- phrase(start)(new lexical.Scanner(input)) match {
- case Success(plan, _) => plan match {
- case x: LoadTable =>
- x.inputSqlString = input
- x
- case logicalPlan => logicalPlan
- }
- case failureOrError => sys.error(failureOrError.toString)
- }
- }
- }
-
- override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
-
- protected lazy val startCommand: Parser[LogicalPlan] =
- createDatabase | dropDatabase | loadManagement | describeTable |
- showPartitions | showLoads | alterTable | updateTable | deleteRecords | useDatabase |
- createTable
-
- protected lazy val loadManagement: Parser[LogicalPlan] =
- deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
-
- protected lazy val createDatabase: Parser[LogicalPlan] =
- CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
- case statement =>
- val createDbSql = "CREATE DATABASE " + statement
- var dbName = ""
- // Get Ast node for create db command
- val node = HiveQlWrapper.getAst(createDbSql)
- node match {
- // get dbname
- case Token("TOK_CREATEDATABASE", children) =>
- dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText)
- }
- CreateDatabase(convertDbNameToLowerCase(dbName), createDbSql)
- }
-
- protected lazy val dropDatabase: Parser[LogicalPlan] =
- DROP ~> (DATABASE | SCHEMA) ~> restInput ^^ {
- case statement =>
- val dropDbSql = "DROP DATABASE " + statement
- var dbName = ""
- var isCascade = false
- // Get Ast node for drop db command
- val node = HiveQlWrapper.getAst(dropDbSql)
- node match {
- case Token("TOK_DROPDATABASE", children) =>
- dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText)
- // check whether cascade drop db
- children.collect {
- case t@Token("TOK_CASCADE", _) =>
- isCascade = true
- case _ => // Unsupport features
- }
- }
- DropDatabase(convertDbNameToLowerCase(dbName), isCascade, dropDbSql)
- }
-
- protected lazy val alterTable: Parser[LogicalPlan] =
- ALTER ~> TABLE ~> restInput ^^ {
- case statement =>
- try {
- val alterSql = "alter table " + statement
- // DDl will be parsed and we get the AST tree from the HiveQl
- val node = HiveQlWrapper.getAst(alterSql)
- // processing the AST tree
- nodeToPlanForAlterTable(node, alterSql)
- } catch {
- // MalformedCarbonCommandException need to be throw directly, parser will catch it
- case ce: MalformedCarbonCommandException =>
- throw ce
- }
- }
-
- /**
- * For handling the create table DDl systax compatible to Hive syntax
- */
- protected lazy val createTable: Parser[LogicalPlan] =
- restInput ^^ {
-
- case statement =>
- try {
- // DDl will be parsed and we get the AST tree from the HiveQl
- val node = HiveQlWrapper.getAst(statement)
- // processing the AST tree
- nodeToPlan(node)
- } catch {
- // MalformedCarbonCommandException need to be throw directly, parser will catch it
- case ce: MalformedCarbonCommandException =>
- throw ce
- case e: Exception =>
- sys.error("Parsing error") // no need to do anything.
- }
- }
-
- /**
- * This function will traverse the tree and logical plan will be formed using that.
- *
- * @param node
- * @return LogicalPlan
- */
- protected def nodeToPlan(node: Node): LogicalPlan = {
- node match {
- // if create table taken is found then only we will handle.
- case Token("TOK_CREATETABLE", children) =>
-
-
- var fields: Seq[Field] = Seq[Field]()
- var tableComment: String = ""
- var tableProperties = Map[String, String]()
- var partitionByFields: Seq[Field] = Seq[Field]()
- var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
- var likeTableName: String = ""
- var storedBy: String = ""
- var ifNotExistPresent: Boolean = false
- var dbName: Option[String] = None
- var tableName: String = ""
- var bucketFields: Option[BucketFields] = None
-
- try {
-
- // Checking whether create table request is carbon table
- children.collect {
- case Token("TOK_STORAGEHANDLER", child :: Nil) =>
- storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText).trim.toLowerCase
- case _ =>
- }
- if (!(storedBy.equals(CarbonContext.datasourceName) ||
- storedBy.equals(CarbonContext.datasourceShortName))) {
- sys.error("Not a carbon format request")
- }
-
- children.collect {
- // collecting all the field list
- case list@Token("TOK_TABCOLLIST", _) =>
- val cols = BaseSemanticAnalyzer.getColumns(list, true)
- if (cols != null) {
- val dupColsGrp = cols.asScala.groupBy(x => x.getName) filter {
- case (_, colList) => colList.size > 1
- }
- if (dupColsGrp.nonEmpty) {
- var columnName: String = ""
- dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
- columnName = columnName.substring(0, columnName.lastIndexOf(", "))
- val errorMessage = "Duplicate column name: " + columnName + " found in table " +
- ".Please check create table statement."
- throw new MalformedCarbonCommandException(errorMessage)
- }
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val name = Option(col.getName())
- // This is to parse complex data types
- val x = '`' + col.getName + '`' + ' ' + col.getType
- val f: Field = anyFieldDef(new lexical.Scanner(x))
- match {
- case Success(field, _) => field
- case failureOrError => throw new MalformedCarbonCommandException(
- s"Unsupported data type: $col.getType")
- }
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = getScaleAndPrecision(col.getType)
- f.precision = precision
- f.scale = scale
- f.dataType = Some("decimal")
- }
- if (f.dataType.getOrElse("").startsWith("char")) {
- f.dataType = Some("char")
- } else if (f.dataType.getOrElse("").startsWith("float")) {
- f.dataType = Some("float")
- }
- f.rawSchema = x
- fields ++= Seq(f)
- }
- }
-
- case Token("TOK_IFNOTEXISTS", _) =>
- ifNotExistPresent = true
-
- case t@Token("TOK_TABNAME", _) =>
- val (db, tblName) = extractDbNameTableName(t)
- dbName = db
- tableName = tblName.toLowerCase()
-
- case Token("TOK_TABLECOMMENT", child :: Nil) =>
- tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
- case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
- val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
- if (cols != null) {
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val comment = col.getComment
- val rawSchema = '`' + col.getName + '`' + ' ' + col.getType
- val field = Field(columnName, dataType, Some(columnName), None)
-
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- if (field.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = getScaleAndPrecision(col.getType)
- field.precision = precision
- field.scale = scale
- field.dataType = Some("decimal")
- }
- if (field.dataType.getOrElse("").startsWith("char")) {
- field.dataType = Some("char")
- } else if (field.dataType.getOrElse("").startsWith("float")) {
- field.dataType = Some("float")
- }
- field.rawSchema = rawSchema
- val partitionCol = new PartitionerField(columnName, dataType, comment)
- partitionCols ++= Seq(partitionCol)
- partitionByFields ++= Seq(field)
- }
- }
- case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
- val propertySeq: Seq[(String, String)] = getProperties(list)
- val repeatedProperties = propertySeq.groupBy(_._1).filter(_._2.size > 1).keySet
- if (repeatedProperties.nonEmpty) {
- val repeatedPropStr: String = repeatedProperties.mkString(",")
- throw new MalformedCarbonCommandException("Table properties is repeated: " +
- repeatedPropStr)
- }
- tableProperties ++= propertySeq
-
- case Token("TOK_LIKETABLE", child :: Nil) =>
- likeTableName = child.getChild(0).getText()
- case Token("TOK_ALTERTABLE_BUCKETS",
- Token("TOK_TABCOLNAME", list) :: numberOfBuckets) =>
- val cols = list.map(_.getText)
- if (cols != null) {
- bucketFields = Some(BucketFields(cols,
- numberOfBuckets.head.getText.toInt))
- }
-
- case _ => // Unsupport features
- }
-
- // validate tblProperties
- if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
- throw new MalformedCarbonCommandException("Invalid table properties")
- }
-
- if (partitionCols.nonEmpty) {
- if (!CommonUtil.validatePartitionColumns(tableProperties, partitionCols)) {
- throw new MalformedCarbonCommandException("Invalid partition definition")
- }
- // partition columns should not be part of the schema
- val colNames = fields.map(_.column)
- val badPartCols = partitionCols.map(_.partitionColumn).toSet.intersect(colNames.toSet)
- if (badPartCols.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "Partition columns should not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"))
- }
- fields ++= partitionByFields
- }
-
- // prepare table model of the collected tokens
- val tableModel: TableModel = prepareTableModel(ifNotExistPresent,
- dbName,
- tableName,
- fields,
- partitionCols,
- tableProperties,
- bucketFields)
-
- // get logical plan.
- CreateTable(tableModel)
- } catch {
- case ce: MalformedCarbonCommandException =>
- val message = if (tableName.isEmpty) {
- "Create table command failed. "
- }
- else if (dbName.isEmpty) {
- s"Create table command failed for $tableName. "
- }
- else {
- s"Create table command failed for ${ dbName.get }.$tableName. "
- }
- LOGGER.audit(message + ce.getMessage)
- throw ce
- }
-
- }
- }
-
- /**
- * This function will traverse the tree and logical plan will be formed using that.
- *
- * @param node
- * @return LogicalPlan
- */
- protected def nodeToPlanForAlterTable(node: Node, alterSql: String): LogicalPlan = {
- node match {
- // if create table taken is found then only we will handle.
- case Token("TOK_ALTERTABLE", children) =>
-
- var dbName: Option[String] = None
- var tableName: String = ""
- var compactionType: String = ""
-
- children.collect {
-
- case t@Token("TOK_TABNAME", _) =>
- val (db, tblName) = extractDbNameTableName(t)
- dbName = db
- tableName = tblName
-
- case Token("TOK_ALTERTABLE_COMPACT", child :: Nil) =>
- compactionType = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
- case _ => // Unsupport features
- }
-
- val altertablemodel = AlterTableModel(dbName,
- tableName,
- None,
- compactionType,
- Some(System.currentTimeMillis()),
- alterSql)
- AlterTableCompaction(altertablemodel)
- }
- }
-
- protected lazy val loadDataNew: Parser[LogicalPlan] =
- LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
- (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
- (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
- case filePath ~ isOverwrite ~ table ~ optionsList =>
- val (databaseNameOp, tableName) = table match {
- case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
- }
- if (optionsList.isDefined) {
- validateOptions(optionsList)
- }
- val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
- LoadTable(convertDbNameToLowerCase(databaseNameOp), tableName, filePath, Seq(), optionsMap,
- isOverwrite.isDefined)
- }
-
- protected lazy val describeTable: Parser[LogicalPlan] =
- ((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
- case ef ~ db ~ tbl =>
- val tblIdentifier = db match {
- case Some(dbName) =>
- TableIdentifier(tbl.toLowerCase, Some(convertDbNameToLowerCase(dbName)))
- case None =>
- TableIdentifier(tbl.toLowerCase, None)
- }
- if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) {
- new DescribeFormattedCommand("describe formatted " + tblIdentifier,
- tblIdentifier)
- } else {
- new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined)
- }
- }
-
- protected lazy val showLoads: Parser[LogicalPlan] =
- SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
- (LIMIT ~> numericLit).? <~
- opt(";") ^^ {
- case databaseName ~ tableName ~ limit =>
- ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit)
- }
-
- protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
- DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
- (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ {
- case dbName ~ tableName ~ loadids =>
- DeleteLoadsById(loadids, convertDbNameToLowerCase(dbName), tableName.toLowerCase())
- }
-
- protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
- DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
- (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
- opt(";") ^^ {
- case database ~ table ~ condition =>
- condition match {
- case dateField ~ dateValue =>
- DeleteLoadsByLoadDate(convertDbNameToLowerCase(database),
- table.toLowerCase(),
- dateField,
- dateValue)
- }
- }
-
- protected lazy val cleanFiles: Parser[LogicalPlan] =
- CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
- case databaseName ~ tableName =>
- CleanFiles(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
- }
-
- protected lazy val explainPlan: Parser[LogicalPlan] =
- (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
- case isExtended ~ logicalPlan =>
- logicalPlan match {
- case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
- case _ => ExplainCommand(OneRowRelation)
- }
- }
-
- protected lazy val deleteRecords: Parser[LogicalPlan] =
- (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
- case table ~ rest =>
- val tableName = getTableName(table.tableIdentifier)
- val alias = table.alias.getOrElse("")
- DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table)
- }
-
- protected lazy val updateTable: Parser[LogicalPlan] =
- UPDATE ~> table ~
- (SET ~> "(" ~> repsep(element, ",") <~ ")") ~
- ("=" ~> restInput) <~ opt(";") ^^ {
- case tab ~ columns ~ rest =>
- val (sel, where) = splitQuery(rest)
- val (selectStmt, relation) =
- if (!sel.toLowerCase.startsWith("select ")) {
- if (sel.trim.isEmpty) {
- sys.error("At least one source column has to be specified ")
- }
- // only list of expression are given, need to convert that list of expressions into
- // select statement on destination table
- val relation = tab match {
- case r@UnresolvedRelation(tableIdentifier, alias) =>
- updateRelation(r, tableIdentifier, alias)
- case _ => tab
- }
- ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " +
- relation.alias.get, relation)
- } else {
- (sel, updateRelation(tab, tab.tableIdentifier, tab.alias))
- }
- UpdateTable(relation, columns, selectStmt, where)
- }
- protected lazy val showPartitions: Parser[LogicalPlan] =
- (SHOW ~> PARTITIONS ~> table) <~ opt(";") ^^ {
- case table =>
- val tableName = getTableName(table.tableIdentifier)
- val alias = table.alias.getOrElse("")
- ShowPartitions(table.tableIdentifier)
- }
-
- private def splitQuery(query: String): (String, String) = {
- val stack = scala.collection.mutable.Stack[Char]()
- var foundSingleQuotes = false
- var foundDoubleQuotes = false
- var foundEscapeChar = false
- var ignoreChar = false
- var stop = false
- var bracketCount = 0
- val (selectStatement, where) = query.span {
- ch => {
- if (stop) {
- false
- } else {
- ignoreChar = false
- if (foundEscapeChar && (ch == '\'' || ch == '\"' || ch == '\\')) {
- foundEscapeChar = false
- ignoreChar = true
- }
- // If escaped single or double quotes found, no need to consider
- if (!ignoreChar) {
- if (ch == '\\') {
- foundEscapeChar = true
- } else if (ch == '\'') {
- foundSingleQuotes = !foundSingleQuotes
- } else if (ch == '\"') {
- foundDoubleQuotes = !foundDoubleQuotes
- }
- else if (ch == '(' && !foundSingleQuotes && !foundDoubleQuotes) {
- bracketCount = bracketCount + 1
- stack.push(ch)
- } else if (ch == ')' && !foundSingleQuotes && !foundDoubleQuotes) {
- bracketCount = bracketCount + 1
- stack.pop()
- if (0 == stack.size) {
- stop = true
- }
- }
- }
- true
- }
- }
- }
- if (bracketCount == 0 || bracketCount % 2 != 0) {
- sys.error("Parsing error, missing bracket ")
- }
- val select = selectStatement.trim
- (select.substring(1, select.length - 1).trim -> where.trim)
- }
-
-
- protected lazy val table: Parser[UnresolvedRelation] = {
- rep1sep(attributeName, ".") ~ opt(ident) ^^ {
- case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
- }
- }
-
- protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", {
- case lexical.Identifier(str) => str.toLowerCase
- case lexical.Keyword(str) if !lexical.delimiters.contains(str) => str.toLowerCase
- })
-
- private def updateRelation(
- r: UnresolvedRelation,
- tableIdentifier: Seq[String],
- alias: Option[String]): UnresolvedRelation = {
- alias match {
- case Some(_) => r
- case _ =>
- val tableAlias = tableIdentifier match {
- case Seq(dbName, tableName) => Some(tableName)
- case Seq(tableName) => Some(tableName)
- }
- UnresolvedRelation(tableIdentifier, tableAlias)
- }
- }
-
- private def getTableName(tableIdentifier: Seq[String]): String = {
- if (tableIdentifier.size > 1) {
- tableIdentifier(0) + "." + tableIdentifier(1)
- } else {
- tableIdentifier(0)
- }
- }
-
- protected lazy val element: Parser[String] =
- (ident <~ ".").? ~ ident ^^ {
- case table ~ column => column.toLowerCase
- }
-
- protected lazy val useDatabase: Parser[LogicalPlan] =
- USE ~> ident <~ opt(";") ^^ {
- case databaseName => UseDatabase(s"use ${ databaseName.toLowerCase }")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
deleted file mode 100644
index 065dfed..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
-import org.apache.spark.util.{ScalaCompilerUtil, Utils}
-
-private[sql] class CodeGenerateFactory(version: String) {
-
- val optimizerFactory = if (version.equals("1.6.2") || version.equals("1.6.3")) {
- ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_6_OptimizerString)
- .asInstanceOf[AbstractCarbonOptimizerFactory]
- } else if (version.startsWith("1.6") || version.startsWith("1.5")) {
- ScalaCompilerUtil.compiledCode(CodeTemplates.defaultOptimizerString)
- .asInstanceOf[AbstractCarbonOptimizerFactory]
- } else {
- throw new UnsupportedOperationException(s"Spark version $version is not supported")
- }
-
- val expandFactory = if (version.startsWith("1.5")) {
- ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_5ExpandString)
- .asInstanceOf[AbstractCarbonExpandFactory]
- } else if (version.startsWith("1.6")) {
- new AbstractCarbonExpandFactory {
- override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
- val loader = Utils.getContextOrSparkClassLoader
- try {
- val cons = loader.loadClass("org.apache.spark.sql.catalyst.plans.logical.Expand")
- .getDeclaredConstructors
- cons.head.setAccessible(true)
- cons.head.newInstance(expand.projections, expand.output, child).asInstanceOf[Expand]
- } catch {
- case e: Exception => null
- }
- }
- }
- } else {
- throw new UnsupportedOperationException(s"Spark version $version is not supported")
- }
-
-}
-
-object CodeGenerateFactory {
-
- private var codeGenerateFactory: CodeGenerateFactory = _
-
- def init(version: String): Unit = {
- if (codeGenerateFactory == null) {
- codeGenerateFactory = new CodeGenerateFactory(version)
- }
- }
-
- def getInstance(): CodeGenerateFactory = {
- codeGenerateFactory
- }
-
- def createDefaultOptimizer(conf: CatalystConf, sc: SparkContext): Optimizer = {
- val name = "org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer"
- val loader = Utils.getContextOrSparkClassLoader
- try {
- val cons = loader.loadClass(name + "$").getDeclaredConstructors
- cons.head.setAccessible(true)
- cons.head.newInstance().asInstanceOf[Optimizer]
- } catch {
- case e: Exception =>
- loader.loadClass(name).getConstructor(classOf[CatalystConf])
- .newInstance(conf).asInstanceOf[Optimizer]
- }
- }
-
-}
-
-object CodeTemplates {
-
- val spark1_6_OptimizerString =
- s"""
- import org.apache.spark.sql._;
- import org.apache.spark.sql.optimizer._;
- import org.apache.spark.sql.catalyst.plans.logical._;
- import org.apache.spark.sql.catalyst._;
- import org.apache.spark.sql.catalyst.optimizer.Optimizer;
-
- new AbstractCarbonOptimizerFactory {
- override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
- class CarbonOptimizer1(optimizer: Optimizer, conf: CarbonSQLConf)
- extends Optimizer(conf) {
- override val batches = Nil;
- override def execute(plan: LogicalPlan): LogicalPlan = {
- CarbonOptimizer.execute(plan, optimizer);
- }
- }
- new CarbonOptimizer1(optimizer, conf);
- }
- }
- """
-
- val defaultOptimizerString =
- s"""
- import org.apache.spark.sql._;
- import org.apache.spark.sql.optimizer._;
- import org.apache.spark.sql.catalyst.plans.logical._;
- import org.apache.spark.sql.catalyst._;
- import org.apache.spark.sql.catalyst.optimizer.Optimizer;
-
- new AbstractCarbonOptimizerFactory {
- override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
- class CarbonOptimizer2(optimizer: Optimizer, conf: CarbonSQLConf) extends Optimizer {
- val batches = Nil;
- override def execute(plan: LogicalPlan): LogicalPlan = {
- CarbonOptimizer.execute(plan, optimizer);
- }
- }
- new CarbonOptimizer2(optimizer, conf);
- }
- }
- """
-
- val spark1_5ExpandString =
- s"""
- import org.apache.spark.sql._
- import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
- new AbstractCarbonExpandFactory {
- override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
- Expand(expand.bitmasks, expand.groupByExprs, expand.gid, child)
- }
- }
- """
-}
-
-abstract class AbstractCarbonOptimizerFactory {
- def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf) : Optimizer
-}
-
-abstract class AbstractCarbonExpandFactory {
- def createExpand(expand: Expand, child: LogicalPlan) : Expand
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
deleted file mode 100644
index d745be2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.types.{DataType, StringType}
-
-/**
- * Custom expression to override the deterministic property
- *
- */
-case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
- override def nullable: Boolean = true
-
- override def eval(input: InternalRow): Any = null
-
- override protected def genCode(ctx: CodeGenContext,
- ev: GeneratedExpressionCode): String = ev.code
- override def deterministic: Boolean = true
-
- override def dataType: DataType = StringType
-
- override def children: Seq[Expression] = Seq()
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
deleted file mode 100644
index dc2dd7b..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
-
-import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, ExpressionResult, UnknownExpression}
-import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException
-import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-class SparkUnknownExpression(var sparkExp: SparkExpression)
- extends UnknownExpression with ConditionalExpression {
-
- private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
- private var isExecutor: Boolean = false
- children.addAll(getColumnList())
-
- override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
-
- val values = carbonRowInstance.getValues.toSeq.map {
- case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
- case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d)
- case value => value
- }
- try {
- val result = evaluateExpression(
- new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
- val sparkRes = if (isExecutor) {
- result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
- } else {
- result
- }
- new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
- sparkRes
- )
- } catch {
- case e: Exception => throw new FilterUnsupportedException(e.getMessage)
- }
- }
-
- override def getFilterExpressionType: ExpressionType = {
- ExpressionType.UNKNOWN
- }
-
- override def getString: String = {
- sparkExp.toString()
- }
-
- def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
- this.evaluateExpression = evaluateExpression
- isExecutor = true
- }
-
- override def findAndSetChild(oldExpr: Expression, newExpr: Expression): Unit = {}
-
- def getColumnList: java.util.List[ColumnExpression] = {
-
- val lst = new java.util.ArrayList[ColumnExpression]()
- getColumnListFromExpressionTree(sparkExp, lst)
- lst
- }
- def getLiterals: java.util.List[ExpressionResult] = {
-
- val lst = new java.util.ArrayList[ExpressionResult]()
- lst
- }
-
- def getAllColumnList: java.util.List[ColumnExpression] = {
- val lst = new java.util.ArrayList[ColumnExpression]()
- getAllColumnListFromExpressionTree(sparkExp, lst)
- lst
- }
-
- def isSingleColumn: Boolean = {
- val lst = new java.util.ArrayList[ColumnExpression]()
- getAllColumnListFromExpressionTree(sparkExp, lst)
- if (lst.size == 1 && lst.get(0).isDimension) {
- true
- } else {
- false
- }
- }
-
- def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
- list: java.util.List[ColumnExpression]): Unit = {
- sparkCurrentExp match {
- case carbonBoundRef: CarbonBoundReference =>
- val foundExp = list.asScala
- .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName)
- if (foundExp.isEmpty) {
- carbonBoundRef.colExp.setColIndex(list.size)
- list.add(carbonBoundRef.colExp)
- } else {
- carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex)
- }
- case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
- }
- }
-
-
- def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
- list: java.util.List[ColumnExpression]): java.util.List[ColumnExpression] = {
- sparkCurrentExp match {
- case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
- case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
- }
- list
- }
-
-}