You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/30 17:46:49 UTC
[3/6] incubator-carbondata git commit: add spark2 module
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
new file mode 100644
index 0000000..2468962
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
+
+class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+ // create a new table using dataframe's schema and write its content into the table
+ sqlContext.sparkSession.sql(makeCreateTableString(dataFrame.schema,
+ new CarbonOption(parameters)))
+ writeToCarbonFile(parameters)
+ }
+
+ def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+ writeToCarbonFile(parameters)
+ }
+
+ private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+ val options = new CarbonOption(parameters)
+ if (options.tempCSV) {
+ loadTempCSV(options)
+ } else {
+ loadDataFrame(options)
+ }
+ }
+
+ /**
+ * Firstly, saving DataFrame to CSV files
+ * Secondly, load CSV files
+ * @param options
+ * @param sqlContext
+ */
+ private def loadTempCSV(options: CarbonOption): Unit = {
+ // temporary solution: write to csv file, then load the csv into carbon
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
+ .append("tempCSV")
+ .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
+ .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
+ .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
+ writeToTempCSVFile(tempCSVFolder, options)
+
+ val tempCSVPath = new Path(tempCSVFolder)
+ val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
+
+ def countSize(): Double = {
+ var size: Double = 0
+ val itor = fs.listFiles(tempCSVPath, true)
+ while (itor.hasNext) {
+ val f = itor.next()
+ if (f.getPath.getName.startsWith("part-")) {
+ size += f.getLen
+ }
+ }
+ size
+ }
+
+ LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
+
+ try {
+ sqlContext.sql(makeLoadString(tempCSVFolder, options))
+ } finally {
+ fs.delete(tempCSVPath, true)
+ }
+ }
+
+ private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
+ var writer: DataFrameWriter[Row] =
+ dataFrame.write
+ .format(csvPackage)
+ .option("header", "false")
+ .mode(SaveMode.Overwrite)
+
+ if (options.compress) {
+ writer = writer.option("codec", "gzip")
+ }
+
+ writer.save(tempCSVFolder)
+ }
+
+ /**
+ * Loading DataFrame directly without saving DataFrame to CSV files.
+ * @param options
+ */
+ private def loadDataFrame(options: CarbonOption): Unit = {
+ val header = dataFrame.columns.mkString(",")
+ LoadTable(
+ Some(options.dbName),
+ options.tableName,
+ null,
+ Seq(),
+ Map("fileheader" -> header),
+ isOverwriteExist = false,
+ null,
+ Some(dataFrame)).run(sqlContext.sparkSession)
+ }
+
+ private def csvPackage: String = "com.databricks.spark.csv.newapi"
+
+ private def convertToCarbonType(sparkType: DataType): String = {
+ sparkType match {
+ case StringType => CarbonType.STRING.getName
+ case IntegerType => CarbonType.INT.getName
+ case ByteType => CarbonType.INT.getName
+ case ShortType => CarbonType.SHORT.getName
+ case LongType => CarbonType.LONG.getName
+ case FloatType => CarbonType.DOUBLE.getName
+ case DoubleType => CarbonType.DOUBLE.getName
+ case BooleanType => CarbonType.DOUBLE.getName
+ case TimestampType => CarbonType.TIMESTAMP.getName
+ case other => sys.error(s"unsupported type: $other")
+ }
+ }
+
+ private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
+ val carbonSchema = schema.map { field =>
+ s"${ field.name } ${ convertToCarbonType(field.dataType) }"
+ }
+ s"""
+ CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+ (${ carbonSchema.mkString(", ") })
+ using 'org.apache.spark.sql.CarbonRelationProvider'
+ """
+ }
+
+ private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
+ if (options.useKettle) {
+ s"""
+ LOAD DATA INPATH '$csvFolder'
+ INTO TABLE ${options.dbName}.${options.tableName}
+ OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
+ """
+ } else {
+ s"""
+ LOAD DATA INPATH '$csvFolder'
+ INTO TABLE ${options.dbName}.${options.tableName}
+ OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}', 'USE_KETTLE' = 'false')
+ """
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
new file mode 100644
index 0000000..24182ec
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.scan.expression.Expression
+import org.apache.carbondata.scan.expression.logical.AndExpression
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+case class CarbonDatasourceHadoopRelation(
+ sparkSession: SparkSession,
+ paths: Array[String],
+ parameters: Map[String, String],
+ tableSchema: Option[StructType])
+ extends BaseRelation with PrunedFilteredScan {
+
+ lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+ lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
+ lazy val carbonRelation: CarbonRelation = {
+ CarbonRelation(
+ carbonTable.getDatabaseName,
+ carbonTable.getFactTableName,
+ CarbonSparkUtil.createSparkMeta(carbonTable),
+ new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
+ None
+ )
+ }
+
+ override def sqlContext: SQLContext = sparkSession.sqlContext
+
+ override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
+
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ val job = new Job(new JobConf())
+ val conf = new Configuration(job.getConfiguration)
+ val filterExpression: Option[Expression] = filters.flatMap { filter =>
+ CarbonFilters.createCarbonFilter(schema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = new CarbonProjection
+ requiredColumns.foreach(projection.addColumn)
+ CarbonInputFormat.setColumnProjection(conf, projection)
+ CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
+
+ new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
+ absIdentifier, carbonTable)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
new file mode 100644
index 0000000..d05aefd
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors.attachTree
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.hive.{CarbonMetastoreTypes, CarbonRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.querystatistics._
+import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
+/**
+ * It decodes the data.
+ *
+ */
+case class CarbonDictionaryDecoder(
+ relations: Seq[CarbonDecoderRelation],
+ profile: CarbonProfile,
+ aliasMap: CarbonAliasDecoderRelation,
+ child: SparkPlan)
+ extends UnaryExecNode {
+
+ override val output: Seq[Attribute] = {
+ child.output.map { a =>
+ val attr = aliasMap.getOrElse(a, a)
+ val relation = relations.find(p => p.contains(attr))
+ if(relation.isDefined && canBeDecoded(attr)) {
+ val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+ val carbonDimension = carbonTable
+ .getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null &&
+ carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ val newAttr = AttributeReference(a.name,
+ convertCarbonToSparkDataType(carbonDimension,
+ relation.get.carbonRelation.carbonRelation),
+ a.nullable,
+ a.metadata)(a.exprId).asInstanceOf[Attribute]
+ newAttr
+ } else {
+ a
+ }
+ } else {
+ a
+ }
+ }
+ }
+
+
+ def canBeDecoded(attr: Attribute): Boolean = {
+ profile match {
+ case ip: IncludeProfile if ip.attributes.nonEmpty =>
+ ip.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case ep: ExcludeProfile =>
+ !ep.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case _ => true
+ }
+ }
+
+ def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+ relation: CarbonRelation): types.DataType = {
+ carbonDimension.getDataType match {
+ case DataType.STRING => StringType
+ case DataType.SHORT => ShortType
+ case DataType.INT => IntegerType
+ case DataType.LONG => LongType
+ case DataType.DOUBLE => DoubleType
+ case DataType.BOOLEAN => BooleanType
+ case DataType.DECIMAL =>
+ val scale: Int = carbonDimension.getColumnSchema.getScale
+ val precision: Int = carbonDimension.getColumnSchema.getPrecision
+ if (scale == 0 && precision == 0) {
+ DecimalType(18, 2)
+ } else {
+ DecimalType(precision, scale)
+ }
+ case DataType.TIMESTAMP => TimestampType
+ case DataType.STRUCT =>
+ CarbonMetastoreTypes
+ .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+ case DataType.ARRAY =>
+ CarbonMetastoreTypes
+ .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ }
+ }
+
+ val getDictionaryColumnIds = {
+ val attributes = child.output
+ val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
+ val attr = aliasMap.getOrElse(a, a)
+ val relation = relations.find(p => p.contains(attr))
+ if(relation.isDefined && canBeDecoded(attr)) {
+ val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null &&
+ carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+ carbonDimension.getDataType)
+ } else {
+ (null, null, null)
+ }
+ } else {
+ (null, null, null)
+ }
+
+ }.toArray
+ dictIds
+ }
+
+ override def doExecute(): RDD[InternalRow] = {
+ attachTree(this, "execute") {
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val absoluteTableIdentifiers = relations.map { relation =>
+ val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
+ (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+ }.toMap
+
+ if (isRequiredToDecode) {
+ val dataTypes = child.output.map { attr => attr.dataType }
+ child.execute().mapPartitions { iter =>
+ val cacheProvider: CacheProvider = CacheProvider.getInstance
+ val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
+ val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+ forwardDictionaryCache)
+ val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
+ // add a task completion listener to clear dictionary that is a decisive factor for
+ // LRU eviction policy
+ val dictionaryTaskCleaner = TaskContext.get
+ dictionaryTaskCleaner.addTaskCompletionListener(context =>
+ dicts.foreach { dictionary =>
+ if (null != dictionary) {
+ dictionary.clear
+ }
+ }
+ )
+ new Iterator[InternalRow] {
+ val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+ var flag = true
+ var total = 0L
+ override final def hasNext: Boolean = iter.hasNext
+ override final def next(): InternalRow = {
+ val startTime = System.currentTimeMillis()
+ val row: InternalRow = iter.next()
+ val data = row.toSeq(dataTypes).toArray
+ dictIndex.foreach { index =>
+ if (data(index) != null) {
+ data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+ .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ getDictionaryColumnIds(index)._3)
+ }
+ }
+ val result = unsafeProjection(new GenericMutableRow(data))
+ total += System.currentTimeMillis() - startTime
+ result
+ }
+ }
+ }
+ } else {
+ child.execute()
+ }
+ }
+ }
+
+ private def isRequiredToDecode = {
+ getDictionaryColumnIds.find(p => p._1 != null) match {
+ case Some(value) => true
+ case _ => false
+ }
+ }
+
+ private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
+ cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+ val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
+ if (f._2 != null) {
+ try {
+ cache.get(new DictionaryColumnUniqueIdentifier(
+ atiMap(f._1).getCarbonTableIdentifier,
+ f._2, f._3))
+ } catch {
+ case _: Throwable => null
+ }
+ } else {
+ null
+ }
+ }
+ dicts
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
new file mode 100644
index 0000000..8028908
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.sql.hive.{CarbonMetastore, DistributionUtil}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Carbon Environment for unified context
+ */
+case class CarbonEnv(carbonMetastore: CarbonMetastore)
+
+object CarbonEnv extends Logging {
+
+ @volatile private var carbonEnv: CarbonEnv = _
+
+ var initialized = false
+
+ def init(sqlContext: SQLContext): Unit = {
+ if (!initialized) {
+ val catalog = {
+ val storePath = sqlContext.sparkSession.conf.get(
+ CarbonCommonConstants.STORE_LOCATION, "/user/hive/warehouse/carbonstore")
+ new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+ }
+ carbonEnv = CarbonEnv(catalog)
+ DistributionUtil.numExistingExecutors = sqlContext.sparkContext.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend => b.getExecutorIds().length
+ case _ => 0
+ }
+ initialized = true
+ }
+ }
+
+ def get: CarbonEnv = {
+ carbonEnv
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index cb0b9a5..9e42b44 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -17,15 +17,16 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.hive.CarbonRelation
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.scan.model._
import org.apache.carbondata.spark.CarbonFilters
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.hive.CarbonRelation
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
case class CarbonScan(
var attributesRaw: Seq[Attribute],
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
new file mode 100644
index 0000000..fb87ba2
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.language.implicitConversions
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.{CreateTable, Field}
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
+
+/**
+ * Carbon relation provider compliant to data source api.
+ * Creates carbon relations
+ */
+class CarbonSource extends CreatableRelationProvider
+ with SchemaRelationProvider with DataSourceRegister {
+
+ override def shortName(): String = "carbondata"
+
+ // called by any write operation like INSERT INTO DDL or DataFrame.write API
+ override def createRelation(
+ sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ CarbonEnv.init(sqlContext)
+ // User should not specify path since only one store is supported in carbon currently,
+ // after we support multi-store, we can remove this limitation
+ require(!parameters.contains("path"), "'path' should not be specified, " +
+ "the path to store carbon file is the 'storePath' specified when creating CarbonContext")
+
+ val options = new CarbonOption(parameters)
+ val storePath = sqlContext.sparkSession.conf.get(CarbonCommonConstants.STORE_LOCATION)
+ val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
+ val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ .exists(tablePath)
+ val (doSave, doAppend) = (mode, isExists) match {
+ case (SaveMode.ErrorIfExists, true) =>
+ sys.error(s"ErrorIfExists mode, path $storePath already exists.")
+ case (SaveMode.Overwrite, true) =>
+ sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}")
+ (true, false)
+ case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
+ (true, false)
+ case (SaveMode.Append, _) =>
+ (false, true)
+ case (SaveMode.Ignore, exists) =>
+ (!exists, false)
+ }
+
+ if (doSave) {
+ // save data when the save mode is Overwrite.
+ new CarbonDataFrameWriter(sqlContext, data).saveAsCarbonFile(parameters)
+ } else if (doAppend) {
+ new CarbonDataFrameWriter(sqlContext, data).appendToCarbonFile(parameters)
+ }
+
+ createRelation(sqlContext, parameters, data.schema)
+ }
+
+ // called by DDL operation with a USING clause
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ dataSchema: StructType): BaseRelation = {
+ CarbonEnv.init(sqlContext)
+ addLateDecodeOptimization(sqlContext.sparkSession)
+ val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
+ CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
+ Option(dataSchema))
+
+ }
+
+ private def addLateDecodeOptimization(ss: SparkSession): Unit = {
+ ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+ ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ }
+
+ private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
+ dataSchema: StructType): String = {
+ val (dbName, tableName) = parameters.get("path") match {
+ case Some(path) =>
+ val p = path.split(File.separator)
+ ("default", p(p.length - 1))
+ case _ => throw new Exception("do not have dbname and tablename for carbon table")
+ }
+ try {
+ CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+ } catch {
+ case ex: NoSuchTableException =>
+ val fields = dataSchema.map { col =>
+ val column = col.name
+ val dataType = Option(col.dataType.toString)
+ val name = Option(col.name)
+ // This is to parse complex data types
+ val x = col.name + ' ' + col.dataType
+ val f: Field = Field(column, dataType, name, None, null)
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = TableCreator.getScaleAndPrecision(col.dataType.toString)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ f
+ }
+ val map = scala.collection.mutable.Map[String, String]();
+ parameters.foreach { x => map.put(x._1, x._2) }
+ val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
+ CreateTable(cm).run(sparkSession)
+ CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+ case _ => throw new Exception("do not have dbname and tablename for carbon table")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
new file mode 100644
index 0000000..284af3d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{ArrayList, List}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
+
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
+import org.apache.carbondata.scan.expression.conditional.ConditionalExpression
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException
+import org.apache.carbondata.scan.filter.intf.{ExpressionType, RowIntf}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+class SparkUnknownExpression(var sparkExp: SparkExpression)
+ extends UnknownExpression with ConditionalExpression {
+
+ private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
+ private var isExecutor: Boolean = false
+ children.addAll(getColumnList())
+
+ override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
+
+ val values = carbonRowInstance.getValues.toSeq.map {
+ case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+ case d: java.math.BigDecimal =>
+ val javaDecVal = new java.math.BigDecimal(d.toString)
+ val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
+ val decConverter = new org.apache.spark.sql.types.Decimal()
+ decConverter.set(scalaDecVal)
+ case value => value
+ }
+ try {
+ val result = evaluateExpression(
+ new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+ val sparkRes = if (isExecutor) {
+ result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
+ } else {
+ result
+ }
+ new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
+ sparkRes
+ )
+ } catch {
+ case e: Exception => throw new FilterUnsupportedException(e.getMessage)
+ }
+ }
+
+ override def getFilterExpressionType: ExpressionType = {
+ ExpressionType.UNKNOWN
+ }
+
+ override def getString: String = {
+ sparkExp.toString()
+ }
+
+ def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
+ this.evaluateExpression = evaluateExpression
+ isExecutor = true
+ }
+
+ def getColumnList: java.util.List[ColumnExpression] = {
+
+ val lst = new java.util.ArrayList[ColumnExpression]()
+ getColumnListFromExpressionTree(sparkExp, lst)
+ lst
+ }
+ def getLiterals: java.util.List[ExpressionResult] = {
+
+ val lst = new java.util.ArrayList[ExpressionResult]()
+ lst
+ }
+
+ def getAllColumnList: java.util.List[ColumnExpression] = {
+ val lst = new java.util.ArrayList[ColumnExpression]()
+ getAllColumnListFromExpressionTree(sparkExp, lst)
+ lst
+ }
+
+ def isSingleDimension: Boolean = {
+ val lst = new java.util.ArrayList[ColumnExpression]()
+ getAllColumnListFromExpressionTree(sparkExp, lst)
+ if (lst.size == 1 && lst.get(0).isDimension) {
+ true
+ } else {
+ false
+ }
+ }
+
+ def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
+ list: java.util.List[ColumnExpression]): Unit = {
+ sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
+ }
+
+
+ def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
+ list: List[ColumnExpression]): List[ColumnExpression] = {
+ sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
+ list
+ }
+
+ def isDirectDictionaryColumns: Boolean = {
+ val lst = new ArrayList[ColumnExpression]()
+ getAllColumnListFromExpressionTree(sparkExp, lst)
+ if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ true
+ } else {
+ false
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
new file mode 100644
index 0000000..14decdb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.regex.{Matcher, Pattern}
+
+import scala.collection.mutable.{LinkedHashSet, Map}
+
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField, TableModel}
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+object TableCreator {
+
+ // detects whether complex dimension is part of dictionary_exclude
+ def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
+ val dimensionType = Array("array", "struct")
+ dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
+ }
+
+ // detects whether double or decimal column is part of dictionary_exclude
+ def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
+ val dataTypes = Array("string", "timestamp")
+ dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+ }
+
+ // detect dimention data type
+ def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
+ val dimensionType =
+ Array("string", "stringtype", "array", "arraytype", "struct",
+ "structtype", "timestamp", "timestamptype")
+ dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
+ }
+
+ protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
+ tableProperties: Map[String, String]):
+ (Seq[Field], Seq[String]) = {
+ var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+ var dictExcludeCols: Array[String] = Array[String]()
+ var noDictionaryDims: Seq[String] = Seq[String]()
+ var dictIncludeCols: Seq[String] = Seq[String]()
+
+ // All excluded cols should be there in create table cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+ dictExcludeCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ dictExcludeCols
+ .map { dictExcludeCol =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
+ val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ } else {
+ val dataType = fields.find(x =>
+ x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
+ if (isComplexDimDictionaryExclude(dataType)) {
+ val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
+ dictExcludeCol
+ throw new MalformedCarbonCommandException(errormsg)
+ } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
+ val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
+ " data type column: " + dictExcludeCol
+ throw new MalformedCarbonCommandException(errorMsg)
+ }
+ }
+ }
+ }
+ // All included cols should be there in create table cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+ dictIncludeCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
+ dictIncludeCols.map { distIncludeCol =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+ val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+ }
+
+ // include cols should contain exclude cols
+ dictExcludeCols.foreach { dicExcludeCol =>
+ if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
+ val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
+ " with DICTIONARY_INCLUDE. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+
+ // by default consider all String cols as dims and if any dictionary exclude is present then
+ // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
+ fields.foreach(field => {
+
+ if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
+ if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP) {
+ noDictionaryDims :+= field.column
+ }
+ dimFields += field
+ } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ dimFields += (field)
+ } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+ dimFields += (field)
+ }
+ }
+ )
+
+ (dimFields.toSeq, noDictionaryDims)
+ }
+
+ /**
+ * Extract the Measure Cols fields. By default all non string cols will be measures.
+ *
+ * @param fields
+ * @param tableProperties
+ * @return
+ */
+ protected def extractMsrColsFromFields(fields: Seq[Field],
+ tableProperties: Map[String, String]): Seq[Field] = {
+ var msrFields: Seq[Field] = Seq[Field]()
+ var dictIncludedCols: Array[String] = Array[String]()
+ var dictExcludedCols: Array[String] = Array[String]()
+
+ // get all included cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+ dictIncludedCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+ }
+
+ // get all excluded cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+ dictExcludedCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ }
+
+ // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
+ fields.foreach(field => {
+ if (!isDetectAsDimentionDatatype(field.dataType.get)) {
+ if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+ !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ msrFields :+= field
+ }
+ }
+ })
+
+ msrFields
+ }
+
+ def getKey(parentColumnName: Option[String],
+ columnName: String): (String, String) = {
+ if (parentColumnName.isDefined) {
+ if (columnName == "val") {
+ (parentColumnName.get, parentColumnName.get + "." + columnName)
+ } else {
+ (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+ }
+ } else {
+ (columnName, columnName)
+ }
+ }
+
+ protected def fillColumnProperty(
+ parentColumnName: Option[String],
+ columnName: String,
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+ val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+ if (colProps.isDefined) {
+ colPropMap.put(colProKey, colProps.get)
+ }
+ }
+
+ protected def fillAllChildrenColumnProperty(
+ parent: String,
+ fieldChildren: Option[List[Field]],
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ fieldChildren.foreach { fields =>
+ fields.foreach { field =>
+ fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+ }
+ }
+ }
+
+ protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+ java.util.Map[String, java.util.List[ColumnProperty]] = {
+ val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
+ fields.foreach { field =>
+ if (field.children.isDefined && field.children.get != null) {
+ fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+ } else {
+ fillColumnProperty(None, field.column, tableProperties, colPropMap)
+ }
+ }
+ colPropMap
+ }
+
+ def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
+ // if columns in column group is not in schema order than arrange it in schema order
+ var colGrpFieldIndx: Seq[Int] = Seq[Int]()
+ colGroup.split(',').map(_.trim).foreach { x =>
+ dims.zipWithIndex.foreach { dim =>
+ if (dim._1.column.equalsIgnoreCase(x)) {
+ colGrpFieldIndx :+= dim._2
+ }
+ }
+ }
+ // sort it
+ colGrpFieldIndx = colGrpFieldIndx.sorted
+ // check if columns in column group is in schema order
+ if (!checkIfInSequence(colGrpFieldIndx)) {
+ throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
+ }
+ def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
+ for (i <- 0 until (colGrpFieldIndx.length - 1)) {
+ if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
+ throw new MalformedCarbonCommandException(
+ "Invalid column group,column in group should be contiguous as per schema.")
+ }
+ }
+ true
+ }
+ val colGrpNames: StringBuilder = StringBuilder.newBuilder
+ for (i <- colGrpFieldIndx.indices) {
+ colGrpNames.append(dims(colGrpFieldIndx(i)).column)
+ if (i < (colGrpFieldIndx.length - 1)) {
+ colGrpNames.append(",")
+ }
+ }
+ colGrpNames.toString()
+ }
+
+ /**
+ * Extract the column groups configuration from table properties.
+ * Based on this Row groups of fields will be determined.
+ *
+ * @param tableProperties
+ * @return
+ */
+ protected def updateColumnGroupsInField(tableProperties: Map[String, String],
+ noDictionaryDims: Seq[String],
+ msrs: Seq[Field],
+ dims: Seq[Field]): Seq[String] = {
+ if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
+
+ var splittedColGrps: Seq[String] = Seq[String]()
+ val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+
+ // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
+ // here first splitting the value by () . so that the above will be splitted into 2 strings.
+ // [col1,col2] [col3,col4]
+ val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
+ while (m.find()) {
+ val oneGroup: String = m.group(1)
+ CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
+ val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
+ splittedColGrps :+= arrangedColGrp
+ }
+ // This will be furthur handled.
+ CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
+ } else {
+ null
+ }
+ }
+
+ private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
+ var complexDimensions: Seq[Field] = Seq()
+ var dimensions: Seq[Field] = Seq()
+ dims.foreach { dimension =>
+ dimension.dataType.getOrElse("NIL") match {
+ case "Array" => complexDimensions = complexDimensions :+ dimension
+ case "Struct" => complexDimensions = complexDimensions :+ dimension
+ case _ => dimensions = dimensions :+ dimension
+ }
+ }
+ dimensions ++ complexDimensions
+ }
+
+ /**
+ * This will extract the no inverted columns fields.
+ * By default all dimensions use inverted index.
+ *
+ * @param fields
+ * @param tableProperties
+ * @return
+ */
+ protected def extractNoInvertedIndexColumns(fields: Seq[Field],
+ tableProperties: Map[String, String]):
+ Seq[String] = {
+ // check whether the column name is in fields
+ var noInvertedIdxColsProps: Array[String] = Array[String]()
+ var noInvertedIdxCols: Seq[String] = Seq[String]()
+
+ if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
+ noInvertedIdxColsProps =
+ tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
+ noInvertedIdxColsProps
+ .map { noInvertedIdxColProp =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+ val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+ }
+ // check duplicate columns and only 1 col left
+ val distinctCols = noInvertedIdxColsProps.toSet
+ // extract the no inverted index columns
+ fields.foreach(field => {
+ if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ noInvertedIdxCols :+= field.column
+ }
+ }
+ )
+ noInvertedIdxCols
+ }
+
+ private def normalizeType(field: Field): Field = {
+ val dataType = field.dataType.getOrElse("NIL")
+ dataType match {
+ case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
+ field.storeType
+ )
+ case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
+ field.parent, field.storeType
+ )
+ case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
+ field.storeType
+ )
+ case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
+ field.storeType
+ )
+ case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
+ field.parent, field.storeType
+ )
+ case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
+ field.storeType
+ )
+ case "array" => Field(field.column, Some("Array"), field.name,
+ field.children.map(f => f.map(normalizeType(_))),
+ field.parent, field.storeType
+ )
+ case "struct" => Field(field.column, Some("Struct"), field.name,
+ field.children.map(f => f.map(normalizeType(_))),
+ field.parent, field.storeType
+ )
+ case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
+ field.storeType
+ )
+ case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
+ field.storeType, field.precision, field.scale
+ )
+ // checking if the nested data type contains the child type as decimal(10,0),
+ // if it is present then extracting the precision and scale. resetting the data type
+ // with Decimal.
+ case _ if (dataType.startsWith("decimal")) =>
+ val (precision, scale) = getScaleAndPrecision(dataType)
+ Field(field.column,
+ Some("Decimal"),
+ field.name,
+ Some(null),
+ field.parent,
+ field.storeType, precision,
+ scale
+ )
+ case _ =>
+ field
+ }
+ }
+
+ private def appendParentForEachChild(field: Field, parentName: String): Field = {
+ field.dataType.getOrElse("NIL") match {
+ case "String" => Field(parentName + "." + field.column, Some("String"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Long" => Field(parentName + "." + field.column, Some("Long"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Double" => Field(parentName + "." + field.column, Some("Double"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Array" => Field(parentName + "." + field.column, Some("Array"),
+ Some(parentName + "." + field.name.getOrElse(None)),
+ field.children
+ .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+ parentName)
+ case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
+ Some(parentName + "." + field.name.getOrElse(None)),
+ field.children
+ .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+ parentName)
+ case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
+ field.storeType, field.precision, field.scale)
+ case _ => field
+ }
+ }
+
+ private def addParent(field: Field): Field = {
+ field.dataType.getOrElse("NIL") match {
+ case "Array" => Field(field.column, Some("Array"), field.name,
+ field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+ field.storeType)
+ case "Struct" => Field(field.column, Some("Struct"), field.name,
+ field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+ field.storeType)
+ case _ => field
+ }
+ }
+
+ def getScaleAndPrecision(dataType: String): (Int, Int) = {
+ val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+ m.find()
+ val matchedString: String = m.group(1)
+ val scaleAndPrecision = matchedString.split(",")
+ (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+ }
+
+ def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
+ , tableName: String, fields: Seq[Field],
+ partitionCols: Seq[PartitionerField],
+ tableProperties: Map[String, String]): TableModel
+ = {
+
+ val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+ fields, tableProperties)
+ if (dims.isEmpty) {
+ throw new MalformedCarbonCommandException(s"Table ${
+ dbName.getOrElse(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+ }.$tableName"
+ +
+ " can not be created without key columns. Please " +
+ "use DICTIONARY_INCLUDE or " +
+ "DICTIONARY_EXCLUDE to set at least one key " +
+ "column " +
+ "if all specified columns are numeric types")
+ }
+ val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+
+ // column properties
+ val colProps = extractColumnProperties(fields, tableProperties)
+ // get column groups configuration from table properties.
+ val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
+ noDictionaryDims, msrs, dims)
+
+ // get no inverted index columns from table properties.
+ val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+
+ // validate the tableBlockSize from table properties
+ CommonUtil.validateTableBlockSize(tableProperties)
+
+ TableModel(ifNotExistPresent,
+ dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+ dbName,
+ tableName,
+ tableProperties,
+ reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+ msrs.map(f => normalizeType(f)),
+ Option(noDictionaryDims),
+ Option(noInvertedIdxCols),
+ groupCols,
+ Some(colProps))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index c2e3915..4ae8d61 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,114 +17,24 @@
package org.apache.spark.sql.execution
-import scala.collection.JavaConverters._
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
-import org.apache.spark.sql._
+import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, CarbonDictionaryDecoder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-import org.apache.spark.sql.types.IntegerType
-///**
-// * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
-// * can improve the aggregation performance and reduce memory usage
-// */
-//private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
-// def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-// plan match {
-// case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-// CarbonDictionaryDecoder(relations,
-// profile,
-// aliasMap,
-// planLater(child)
-// ) :: Nil
-// case _ => Nil
-// }
-// }
-// /**
-// * Create carbon scan
-// */
-// private def carbonRawScan(projectList: Seq[NamedExpression],
-// predicates: Seq[Expression],
-// logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-//
-// val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-// val tableName: String =
-// relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-// // Check out any expressions are there in project list. if they are present then we need to
-// // decode them as well.
-// val projectSet = AttributeSet(projectList.flatMap(_.references))
-// val scan = CarbonScan(projectSet.toSeq, relation.carbonRelation, predicates)
-// projectList.map {
-// case attr: AttributeReference =>
-// case Alias(attr: AttributeReference, _) =>
-// case others =>
-// others.references.map{f =>
-// val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name)
-// if (dictionary.isDefined && dictionary.get) {
-// scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
-// }
-// }
-// }
-// if (scan.attributesNeedToDecode.size() > 0) {
-// val decoder = getCarbonDecoder(logicalRelation,
-// sc,
-// tableName,
-// scan.attributesNeedToDecode.asScala.toSeq,
-// scan)
-// if (scan.unprocessedExprs.nonEmpty) {
-// val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-// ProjectExec(projectList, filterCondToAdd.map(FilterExec(_, decoder)).getOrElse(decoder))
-// } else {
-// ProjectExec(projectList, decoder)
-// }
-// } else {
-// ProjectExec(projectList, scan)
-// }
-// }
-//
-// def getCarbonDecoder(logicalRelation: LogicalRelation,
-// sc: SQLContext,
-// tableName: String,
-// projectExprsNeedToDecode: Seq[Attribute],
-// scan: CarbonScan): CarbonDictionaryDecoder = {
-// val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
-// logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
-// val attrs = projectExprsNeedToDecode.map { attr =>
-// val newAttr = AttributeReference(attr.name,
-// attr.dataType,
-// attr.nullable,
-// attr.metadata)(attr.exprId)
-// relation.addAttribute(newAttr)
-// newAttr
-// }
-// CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
-// CarbonAliasDecoderRelation(), scan)(sc)
-// }
-//
-// def isDictionaryEncoded(projectExprsNeedToDecode: Seq[Attribute],
-// relation: CarbonDatasourceHadoopRelation): Boolean = {
-// var isEncoded = false
-// projectExprsNeedToDecode.foreach { attr =>
-// if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false)) {
-// isEncoded = true
-// }
-// }
-// isEncoded
-// }
-//
-// def updateDataType(attr: AttributeReference,
-// relation: CarbonDatasourceHadoopRelation,
-// allAttrsNotDecode: java.util.Set[Attribute]): AttributeReference = {
-// if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
-// !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
-// AttributeReference(attr.name,
-// IntegerType,
-// attr.nullable,
-// attr.metadata)(attr.exprId, attr.qualifiers)
-// } else {
-// attr
-// }
-// }
-//}
+/**
+ * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
+ * can improve the aggregation performance and reduce memory usage
+ */
+private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
+ CarbonDictionaryDecoder(relations,
+ profile,
+ aliasMap,
+ planLater(child)
+ ) :: Nil
+ case _ => Nil
+ }
+ }
+
+}