You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/31 07:00:02 UTC
[04/22] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
deleted file mode 100644
index cd42fba..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.SqlParser
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-
-/**
- * This class contains all carbon hive metadata related utilities
- */
-object CarbonHiveMetadataUtil {
-
- @transient
- val LOGGER = LogServiceFactory.getLogService(CarbonHiveMetadataUtil.getClass.getName)
-
-
- /**
- * This method invalidates the table from HiveMetastoreCatalog before dropping table
- *
- * @param databaseName
- * @param tableName
- * @param sqlContext
- */
- def invalidateAndDropTable(databaseName: String,
- tableName: String,
- sqlContext: SQLContext): Unit = {
- val hiveContext = sqlContext.asInstanceOf[HiveContext]
- val tableWithDb = databaseName + "." + tableName
- val tableIdent = SqlParser.parseTableIdentifier(tableWithDb)
- try {
- hiveContext.catalog.invalidateTable(tableIdent)
- hiveContext.runSqlHive(s"DROP TABLE IF EXISTS $databaseName.$tableName")
- } catch {
- case e: Exception =>
- LOGGER.audit(
- s"Error While deleting the table $databaseName.$tableName during drop carbon table" +
- e.getMessage)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
deleted file mode 100644
index 4d5d39a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io._
-import java.util.UUID
-
-import scala.Array.canBuildFrom
-import scala.collection.mutable.ArrayBuffer
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.RegexParsers
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.client.ClientInterface
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.reader.ThriftReader
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.ThriftWriter
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
-
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
-
-case class CarbonMetaData(dims: Seq[String],
- msrs: Seq[String],
- carbonTable: CarbonTable,
- dictionaryMap: DictionaryMap)
-
-object CarbonMetastore {
-
- def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
- val createTBase = new ThriftReader.TBaseCreator() {
- override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
- new TableInfo()
- }
- }
- val thriftReader = new ThriftReader(schemaFilePath, createTBase)
- var tableInfo: TableInfo = null
- try {
- thriftReader.open()
- tableInfo = thriftReader.read().asInstanceOf[TableInfo]
- } finally {
- thriftReader.close()
- }
- tableInfo
- }
-
- def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
- val thriftWriter = new ThriftWriter(schemaFilePath, false)
- try {
- thriftWriter.open()
- thriftWriter.write(tableInfo);
- } finally {
- thriftWriter.close()
- }
- }
-
-}
-
-case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
- def get(name: String): Option[Boolean] = {
- dictionaryMap.get(name.toLowerCase)
- }
-}
-
-class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
- client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) {
-
- @transient
- val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
-
- val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
- tableModifiedTimeStore
- .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
-
- val metadata = loadMetadata(storePath)
-
- def getTableCreationTime(databaseName: String, tableName: String): Long = {
- val tableMeta = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
- val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
- tableCreationTime
- }
-
- def lookupRelation1(dbName: Option[String],
- tableName: String)(sqlContext: SQLContext): LogicalPlan = {
- lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
- }
-
- def lookupRelation1(tableIdentifier: TableIdentifier,
- alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tables = getTableFromMetadata(database, tableIdentifier.table)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableIdentifier.table,
- CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
- case None =>
- LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
- throw new NoSuchTableException
- }
- }
-
- /**
- * This method will search for a table in the catalog metadata
- *
- * @param database
- * @param tableName
- * @return
- */
- def getTableFromMetadata(database: String,
- tableName: String): Option[TableMeta] = {
- metadata.tablesMeta
- .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
- }
-
- def tableExists(identifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = identifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tables = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(identifier.table))
- tables.nonEmpty
- }
-
- def loadMetadata(metadataPath: String): MetaData = {
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- // creating zookeeper instance once.
- // if zookeeper is configured as carbon lock type.
- val zookeeperurl = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
- if (null != zookeeperurl) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
- }
- if (metadataPath == null) {
- return null
- }
- // if no locktype is configured and store type is HDFS set HDFS lock as default
- if (null == CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
- FileType.HDFS == FileFactory.getFileType(metadataPath)) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
- )
- LOGGER.info("Default lock type HDFSLOCK is configured")
- }
- val fileType = FileFactory.getFileType(metadataPath)
- val metaDataBuffer = new ArrayBuffer[TableMeta]
- fillMetaData(metadataPath, fileType, metaDataBuffer)
- updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
- statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
- System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- MetaData(metaDataBuffer)
- }
-
- private def fillMetaData(basePath: String, fileType: FileType,
- metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
- val databasePath = basePath // + "/schemas"
- try {
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val file = FileFactory.getCarbonFile(databasePath, fileType)
- val databaseFolders = file.listFiles()
-
- databaseFolders.foreach(databaseFolder => {
- if (databaseFolder.isDirectory) {
- val dbName = databaseFolder.getName
- val tableFolders = databaseFolder.listFiles()
-
- tableFolders.foreach(tableFolder => {
- if (tableFolder.isDirectory) {
- val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
- carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableName = tableFolder.getName
- val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
- val createTBase = new ThriftReader.TBaseCreator() {
- override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
- new TableInfo()
- }
- }
- val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
- thriftReader.open()
- val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
- thriftReader.close()
-
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
- null, carbonTable)
- }
- }
- })
- }
- })
- } else {
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- } catch {
- case s: java.io.FileNotFoundException =>
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- }
-
- /**
- *
- * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
- * Load CarbonTable from wrapper tableinfo
- *
- */
- def createTableFromThrift(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- dbName: String, tableName: String, partitioner: Partitioner)
- (sqlContext: SQLContext): String = {
- if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
- sys.error(s"Table [$tableName] already exists under Database [$dbName]")
- }
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val thriftTableInfo = schemaConverter
- .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
- thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
- .add(schemaEvolutionEntry)
-
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
- CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
-
- val fileType = FileFactory.getFileType(schemaMetadataPath)
- if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
- FileFactory.mkdirs(schemaMetadataPath, fileType)
- }
-
- val thriftWriter = new ThriftWriter(schemaFilePath, false)
- thriftWriter.open()
- thriftWriter.write(thriftTableInfo)
- thriftWriter.close()
-
- metadata.tablesMeta += tableMeta
- logInfo(s"Table $tableName for Database $dbName created successfully.")
- LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- carbonTablePath.getPath
- }
-
- private def updateMetadataByWrapperTable(
- wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
-
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- wrapperTableInfo.getTableUniqueName)
- for (i <- metadata.tablesMeta.indices) {
- if (wrapperTableInfo.getTableUniqueName.equals(
- metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
- metadata.tablesMeta(i).carbonTable = carbonTable
- }
- }
- }
-
- def updateMetadataByThriftTable(schemaFilePath: String,
- tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
-
- tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis())
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- wrapperTableInfo.setStorePath(storePath)
- updateMetadataByWrapperTable(wrapperTableInfo)
- }
-
- /**
- * Shows all tables for given schema.
- */
- def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
-
- val dbName =
- databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
- checkSchemasModifiedTimeAndReloadTables()
- metadata.tablesMeta.filter { c =>
- c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName)
- }.map { c => (c.carbonTableIdentifier.getTableName, false) }
- }
-
- def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
- val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
- val tableName = tableIdentifier.table
-
- val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
- }
-
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
- (sqlContext: SQLContext) {
- val dbName = tableIdentifier.database.get
- val tableName = tableIdentifier.table
-
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
-
- val fileType = FileFactory.getFileType(metadataFilePath)
-
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- // while drop we should refresh the schema modified time so that if any thing has changed
- // in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
- tableIdentifier.table)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
- CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
- CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- case None =>
- logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
- }
- CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
- // discard cached table info in cachedDataSourceTables
- sqlContext.catalog.refreshTable(tableIdentifier)
- DataMapStoreManager.getInstance().
- clearDataMap(AbsoluteTableIdentifier.from(storePath, dbName, tableName))
- }
- }
-
- private def getTimestampFileAndType(databaseName: String, tableName: String) = {
- val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
- val timestampFileType = FileFactory.getFileType(timestampFile)
- (timestampFile, timestampFileType)
- }
-
- /**
- * This method will put the updated timestamp of schema file in the table modified time store map
- *
- * @param timeStamp
- */
- def updateSchemasUpdatedTime(timeStamp: Long) {
- tableModifiedTimeStore.put("default", timeStamp)
- }
-
- /**
- * This method will read the timestamp of empty schema file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
- } else {
- System.currentTimeMillis()
- }
- }
-
- /**
- * This method will check and create an empty schema timestamp file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
- FileFactory.createNewFile(timestampFile, timestampFileType)
- }
- val systemTime = System.currentTimeMillis()
- FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .setLastModifiedTime(systemTime)
- systemTime
- }
-
- def checkSchemasModifiedTimeAndReloadTables() {
- val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
- getLastModifiedTime ==
- tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
- refreshCache()
- }
- }
- }
-
- def refreshCache() {
- metadata.tablesMeta = loadMetadata(storePath).tablesMeta
- }
-
- def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
- var schemaLastUpdatedTime = System.currentTimeMillis
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
- .getLastModifiedTime
- }
- schemaLastUpdatedTime
- }
-
- def createDatabaseDirectory(dbName: String) {
- val databasePath = storePath + File.separator + dbName
- val fileType = FileFactory.getFileType(databasePath)
- FileFactory.mkdirs(databasePath, fileType)
- }
-
- def dropDatabaseDirectory(dbName: String) {
- val databasePath = storePath + File.separator + dbName
- val fileType = FileFactory.getFileType(databasePath)
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
- CarbonUtil.deleteFoldersAndFiles(dbPath)
- }
- }
-
-}
-
-
-object CarbonMetastoreTypes extends RegexParsers {
- protected lazy val primitiveType: Parser[DataType] =
- "string" ^^^ StringType |
- "float" ^^^ FloatType |
- "int" ^^^ IntegerType |
- "tinyint" ^^^ ShortType |
- "short" ^^^ ShortType |
- "double" ^^^ DoubleType |
- "long" ^^^ LongType |
- "binary" ^^^ BinaryType |
- "boolean" ^^^ BooleanType |
- fixedDecimalType |
- "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
- "varchar\\((\\d+)\\)".r ^^^ StringType |
- "timestamp" ^^^ TimestampType |
- "date" ^^^ DateType |
- "char\\((\\d+)\\)".r ^^^ StringType
-
- protected lazy val fixedDecimalType: Parser[DataType] =
- "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
- case precision ~ scale =>
- DecimalType(precision.toInt, scale.toInt)
- }
-
- protected lazy val arrayType: Parser[DataType] =
- "array" ~> "<" ~> dataType <~ ">" ^^ {
- case tpe => ArrayType(tpe)
- }
-
- protected lazy val mapType: Parser[DataType] =
- "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
- case t1 ~ _ ~ t2 => MapType(t1, t2)
- }
-
- protected lazy val structField: Parser[StructField] =
- "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
- case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
- }
-
- protected lazy val structType: Parser[DataType] =
- "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
- case fields => StructType(fields)
- }
-
- protected lazy val dataType: Parser[DataType] =
- arrayType |
- mapType |
- structType |
- primitiveType
-
- def toDataType(metastoreType: String): DataType = {
- parseAll(dataType, metastoreType) match {
- case Success(result, _) => result
- case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
- }
- }
-
- def toMetastoreType(dt: DataType): String = {
- dt match {
- case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
- case StructType(fields) =>
- s"struct<${
- fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
- .mkString(",")
- }>"
- case StringType => "string"
- case FloatType => "float"
- case IntegerType => "int"
- case ShortType => "tinyint"
- case DoubleType => "double"
- case LongType => "bigint"
- case BinaryType => "binary"
- case BooleanType => "boolean"
- case DecimalType() => "decimal"
- case DateType => "date"
- case TimestampType => "timestamp"
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
deleted file mode 100644
index 82c5f7f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.CarbonSqlParser
-import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-private[spark] class CarbonSQLDialect(hiveContext: HiveContext) extends ParserDialect {
-
- @transient
- protected val sqlParser = new CarbonSqlParser
-
- override def parse(sqlText: String): LogicalPlan = {
-
- try {
- sqlParser.parse(sqlText)
- } catch {
- // MalformedCarbonCommandException need to throw directly
- // because hive can no parse carbon command
- case ce: MalformedCarbonCommandException =>
- throw ce
- case _: Throwable =>
- HiveQl.parseSql(sqlText)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
deleted file mode 100644
index d3d699a..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ScalaUDF, _}
-import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
-import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
-import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
-
- override def strategies: Seq[Strategy] = getStrategies
-
- val LOGGER = LogServiceFactory.getLogService("CarbonStrategies")
-
- def getStrategies: Seq[Strategy] = {
- val total = sqlContext.planner.strategies :+ CarbonTableScan
- total
- }
-
- /**
- * Carbon strategies for performing late materizlization (decoding dictionary key
- * as late as possbile)
- */
- private[sql] object CarbonTableScan extends Strategy {
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = {
- plan match {
- case PhysicalOperation(projectList, predicates, l: LogicalRelation)
- if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- if (isStarQuery(plan)) {
- carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
- } else {
- carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
- }
- case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
- _, child: LogicalPlan, overwrite, _) =>
- ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
- case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
- CarbonDictionaryDecoder(relations,
- profile,
- aliasMap,
- planLater(child))(sqlContext) :: Nil
- case _ =>
- Nil
- }
- }
-
- /**
- * Create carbon scan
- */
- private def carbonRawScan(projectListRaw: Seq[NamedExpression],
- predicates: Seq[Expression],
- logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-
- val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
- val tableName: String =
- relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
- // Check out any expressions are there in project list. if they are present then we need to
- // decode them as well.
-
- val projectList = projectListRaw.map {p =>
- p.transform {
- case CustomDeterministicExpression(exp) => exp
- }
- }.asInstanceOf[Seq[NamedExpression]]
- val newProjectList = projectList.map {
- case a@Alias(s: ScalaUDF, name)
- if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
- AttributeReference(name, StringType, true)().withExprId(a.exprId)
- case a@Alias(s: ScalaUDF, name)
- if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
- val reference =
- AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
- StringType, true)().withExprId(a.exprId)
- val alias = a.transform {
- case s: ScalaUDF =>
- ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
- }.asInstanceOf[Alias]
- Alias(alias.child, alias.name)(alias.exprId, alias.qualifiers, alias.explicitMetadata)
- case other => other
- }
- val projectSet = AttributeSet(newProjectList.flatMap(_.references))
- val filterSet = AttributeSet(predicates.flatMap(_.references))
- val scan = CarbonScan(projectSet.toSeq,
- relation.carbonRelation,
- predicates)(sqlContext)
- newProjectList.map {
- case attr: AttributeReference =>
- case Alias(attr: AttributeReference, _) =>
- case others =>
- others.references.map { f =>
- val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name)
- if (dictionary.isDefined && dictionary.get) {
- scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
- }
- }
- }
- val scanWithDecoder =
- if (scan.attributesNeedToDecode.size() > 0) {
- val decoder = getCarbonDecoder(logicalRelation,
- sc,
- tableName,
- scan.attributesNeedToDecode.asScala.toSeq,
- scan)
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
- } else {
- decoder
- }
- } else {
- scan
- }
-
- if (projectList.map(_.toAttribute) == scan.columnProjection &&
- projectSet.size == projectList.size &&
- filterSet.subsetOf(projectSet)) {
- // copied from spark pruneFilterProjectRaw
- // When it is possible to just use column pruning to get the right projection and
- // when the columns of this projection are enough to evaluate all filter conditions,
- // just do a scan with no extra project.
- scanWithDecoder
- } else {
- Project(newProjectList, scanWithDecoder)
- }
- }
-
- /**
- * Create carbon scan for star query
- */
- private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
- val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
- val tableName: String =
- relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
- // Check out any expressions are there in project list. if they are present then we need to
- // decode them as well.
- val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
- val scan = CarbonScan(projectList.map(_.toAttribute),
- relation.carbonRelation,
- predicates,
- useUnsafeCoversion = false)(sqlContext)
- projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
- val updatedAttrs = scan.columnProjection.map(attr =>
- updateDataType(attr.asInstanceOf[AttributeReference], relation, projectExprsNeedToDecode))
- scan.columnProjection = updatedAttrs
- if (projectExprsNeedToDecode.size() > 0
- && isDictionaryEncoded(projectExprsNeedToDecode.asScala.toSeq, relation)) {
- val decoder = getCarbonDecoder(logicalRelation,
- sc,
- tableName,
- projectExprsNeedToDecode.asScala.toSeq,
- scan)
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
- } else {
- decoder
- }
- } else {
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- filterCondToAdd.map(Filter(_, scan)).getOrElse(scan)
- } else {
- scan
- }
- }
- }
-
- def getCarbonDecoder(logicalRelation: LogicalRelation,
- sc: SQLContext,
- tableName: String,
- projectExprsNeedToDecode: Seq[Attribute],
- scan: CarbonScan): CarbonDictionaryDecoder = {
- val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
- logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
- val attrs = projectExprsNeedToDecode.map { attr =>
- val newAttr = AttributeReference(attr.name,
- attr.dataType,
- attr.nullable,
- attr.metadata)(attr.exprId, Seq(tableName))
- relation.addAttribute(newAttr)
- newAttr
- }
- CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
- CarbonAliasDecoderRelation(), scan)(sc)
- }
-
- def isDictionaryEncoded(projectExprsNeedToDecode: Seq[Attribute],
- relation: CarbonDatasourceRelation): Boolean = {
- var isEncoded = false
- projectExprsNeedToDecode.foreach { attr =>
- if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false)) {
- isEncoded = true
- }
- }
- isEncoded
- }
-
- def updateDataType(attr: AttributeReference,
- relation: CarbonDatasourceRelation,
- allAttrsNotDecode: util.Set[Attribute]): AttributeReference = {
- if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
- !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
- AttributeReference(attr.name,
- IntegerType,
- attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifiers)
- } else {
- attr
- }
- }
-
- private def isStarQuery(plan: LogicalPlan) = {
- plan match {
- case LogicalFilter(condition, l: LogicalRelation)
- if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
- true
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
- case _ => false
- }
- }
- }
-
- object DDLStrategies extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case DropTable(tableName, ifNotExists)
- if CarbonEnv.get.carbonMetastore
- .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
- val identifier = toTableIdentifier(tableName.toLowerCase)
- ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
- case ShowLoadsCommand(databaseName, table, limit) =>
- ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
- case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
- options, isOverwriteExist, inputSqlString, dataFrame, _) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
- if (isCarbonTable || options.nonEmpty) {
- ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
- options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
- } else {
- ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
- }
- case alterTable@AlterTableCompaction(altertablemodel) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(altertablemodel.tableName,
- altertablemodel.dbName))(sqlContext)
- if (isCarbonTable) {
- if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
- altertablemodel.compactionType.equalsIgnoreCase("major")) {
- ExecutedCommand(alterTable) :: Nil
- } else {
- throw new MalformedCarbonCommandException(
- "Unsupported alter operation on carbon table")
- }
- } else {
- ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil
- }
- case CreateDatabase(dbName, sql) =>
- ExecutedCommand(CreateDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil
- case DropDatabase(dbName, isCascade, sql) =>
- if (isCascade) {
- ExecutedCommand(DropDatabaseCascadeCommand(dbName, HiveNativeCommand(sql))) :: Nil
- } else {
- ExecutedCommand(DropDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil
- }
- case UseDatabase(sql) =>
- ExecutedCommand(HiveNativeCommand(sql)) :: Nil
- case d: HiveNativeCommand =>
- try {
- val resolvedTable = sqlContext.executePlan(CarbonHiveSyntax.parse(d.sql)).optimizedPlan
- planLater(resolvedTable) :: Nil
- } catch {
- case ce: MalformedCarbonCommandException =>
- throw ce
- case ae: AnalysisException =>
- throw ae
- case e: Exception => ExecutedCommand(d) :: Nil
- }
- case DescribeFormattedCommand(sql, tblIdentifier) =>
- val isTable = CarbonEnv.get.carbonMetastore
- .tableExists(tblIdentifier)(sqlContext)
- if (isTable) {
- val describe =
- LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)
- val resolvedTable = sqlContext.executePlan(describe.table).analyzed
- val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan
- ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, tblIdentifier)) :: Nil
- } else {
- ExecutedCommand(HiveNativeCommand(sql)) :: Nil
- }
- case ShowPartitions(t) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
- .tableExists(t)(sqlContext)
- if (isCarbonTable) {
- ExecutedCommand(ShowCarbonPartitionsCommand(t)) :: Nil
- } else {
- var tableName = t.table
- var database = t.database
- var sql: String = null
- if (database.isEmpty) {
- sql = s"show partitions $tableName"
- } else {
- sql = s"show partitions $database.$tableName"
- }
- ExecutedCommand(HiveNativeCommand(sql)) :: Nil
- }
- case _ =>
- Nil
- }
-
- def toTableIdentifier(name: String): TableIdentifier = {
- val identifier = name.split("\\.")
- identifier match {
- case Array(tableName) => TableIdentifier(tableName, None)
- case Array(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
- }
- }
- }
-
-}
-
-object CarbonHiveSyntax {
-
- @transient
- protected val sqlParser = new CarbonSqlParser
-
- def parse(sqlText: String): LogicalPlan = {
- sqlParser.parse(sqlText)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
deleted file mode 100644
index 6b244f9..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.hive.ql.parse.ASTNode
-
-/**
- * Wrapper class for using the hiveQl class of hive.
- */
-object HiveQlWrapper {
-
- def getAst(sql: String): ASTNode = {
-
- HiveQl.getAst(sql)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
deleted file mode 100644
index ad70027..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.cli
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.CarbonContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.thriftserver.{SparkSQLCLIDriver, SparkSQLEnv}
-import org.apache.spark.util.Utils
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-object CarbonSQLCLIDriver {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- var hiveContext: HiveContext = _
- var sparkContext: SparkContext = _
-
- def main(args: Array[String]): Unit = {
- init()
- SparkSQLEnv.sparkContext = sparkContext
- SparkSQLEnv.hiveContext = hiveContext
- SparkSQLCLIDriver.installSignalHandler()
- SparkSQLCLIDriver.main(args)
- }
-
- def init() {
- if (hiveContext == null) {
- val sparkConf = new SparkConf(loadDefaults = true)
- val maybeSerializer = sparkConf.getOption("spark.serializer")
- val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
- // If user doesn't specify the appName, we want to get [SparkSQL::localHostName] instead of
- // the default appName [CarbonSQLCLIDriver] in cli or beeline.
- val maybeAppName = sparkConf
- .getOption("spark.app.name")
- .filterNot(_ == classOf[SparkSQLCLIDriver].getName)
- val maybeStorePath = sparkConf.getOption("spark.carbon.storepath")
-
- sparkConf
- .setAppName(maybeAppName.getOrElse(s"CarbonSparkSQL::${ Utils.localHostName() }"))
- .set(
- "spark.serializer",
- maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
- .set(
- "spark.kryo.referenceTracking",
- maybeKryoReferenceTracking.getOrElse("false"))
-
- sparkContext = new SparkContext(sparkConf)
- sparkContext.addSparkListener(new StatsReportListener())
- val path = System.getenv("CARBON_HOME") + "/bin/carbonsqlclistore"
- val store = new File(path)
- store.mkdirs()
- hiveContext = new CarbonContext(sparkContext,
- maybeStorePath.getOrElse(store.getCanonicalPath),
- store.getCanonicalPath)
-
- hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
- hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) =>
- LOGGER.debug(s"HiveConf var: $k=$v")
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
deleted file mode 100644
index 0f42940..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution.command
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.command.DropTableCommand
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
-
-private[hive] case class CreateDatabaseCommand(dbName: String,
- command: HiveNativeCommand) extends RunnableCommand {
- def run(sqlContext: SQLContext): Seq[Row] = {
- val rows = command.run(sqlContext)
- CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
- rows
- }
-}
-
-private[hive] case class DropDatabaseCommand(dbName: String,
- command: HiveNativeCommand) extends RunnableCommand {
- def run(sqlContext: SQLContext): Seq[Row] = {
- val rows = command.run(sqlContext)
- CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
- rows
- }
-}
-
-private[hive] case class DropDatabaseCascadeCommand(dbName: String,
- command: HiveNativeCommand) extends RunnableCommand {
- def run(sqlContext: SQLContext): Seq[Row] = {
- val tablesInDB = CarbonEnv.get.carbonMetastore
- .getTables(Some(dbName))(sqlContext).map(x => x._1)
- val rows = command.run(sqlContext)
- tablesInDB.foreach{tableName =>
- DropTableCommand(true, Some(dbName), tableName).run(sqlContext)
- }
- CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
- rows
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
deleted file mode 100644
index 2e45954..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, StartsWith, _}
-import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.apache.carbondata.core.scan.expression.conditional.{GreaterThanEqualToExpression, LessThanExpression, _}
-import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * All filter conversions are done here.
- */
-object CarbonFilters {
-
- /**
- * Converts data sources filters to carbon filter predicates.
- */
- def createCarbonFilter(schema: StructType,
- predicate: sources.Filter): Option[CarbonExpression] = {
- val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
-
- def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
- predicate match {
-
- case sources.EqualTo(name, value) =>
- Some(new EqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.Not(sources.EqualTo(name, value)) =>
- Some(new NotEqualsExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.EqualNullSafe(name, value) =>
- Some(new EqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.Not(sources.EqualNullSafe(name, value)) =>
- Some(new NotEqualsExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.GreaterThan(name, value) =>
- Some(new GreaterThanExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.LessThan(name, value) =>
- Some(new LessThanExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.GreaterThanOrEqual(name, value) =>
- Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.LessThanOrEqual(name, value) =>
- Some(new LessThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.In(name, values) =>
- Some(new InExpression(getCarbonExpression(name),
- new ListExpression(
- convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
- case sources.Not(sources.In(name, values)) =>
- Some(new NotInExpression(getCarbonExpression(name),
- new ListExpression(
- convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-
- case sources.IsNull(name) =>
- Some(new EqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, null), true))
- case sources.IsNotNull(name) =>
- Some(new NotEqualsExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, null), true))
-
- case sources.And(lhs, rhs) =>
- (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
-
- case sources.Or(lhs, rhs) =>
- for {
- lhsFilter <- createFilter(lhs)
- rhsFilter <- createFilter(rhs)
- } yield {
- new OrExpression(lhsFilter, rhsFilter)
- }
- case sources.StringStartsWith(name, value) if value.length > 0 =>
- val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value))
- val maxValueLimit = value.substring(0, value.length - 1) +
- (value.charAt(value.length - 1).toInt + 1).toChar
- val r = new LessThanExpression(
- getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
- Some(new AndExpression(l, r))
- case _ => None
- }
- }
-
- def getCarbonExpression(name: String) = {
- new CarbonColumnExpression(name,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
- }
-
- def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
- val dataTypeOfAttribute = CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))
- val dataType = if (Option(value).isDefined
- && dataTypeOfAttribute == CarbonDataTypes.STRING
- && value.isInstanceOf[Double]) {
- CarbonDataTypes.DOUBLE
- } else {
- dataTypeOfAttribute
- }
- new CarbonLiteralExpression(value, dataType)
- }
-
- createFilter(predicate)
- }
-
-
- // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
- // Mostly dimension filters are only pushed down since it is faster in carbon.
- // TODO - The Filters are first converted Intermediate sources filters expression and then these
- // expressions are again converted back to CarbonExpression. Instead of two step process of
- // evaluating the filters it can be merged into a single one.
- def selectFilters(filters: Seq[Expression],
- attrList: java.util.HashSet[AttributeReferenceWrapper],
- aliasMap: CarbonAliasDecoderRelation): Unit = {
- def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
- expr match {
- case or@ Or(left, right) =>
-
- val leftFilter = translate(left, or = true)
- val rightFilter = translate(right, or = true)
- if (leftFilter.isDefined && rightFilter.isDefined) {
- Some( sources.Or(leftFilter.get, rightFilter.get))
- } else {
- or.collect {
- case attr: AttributeReference =>
- attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- None
- }
-
- case And(left, right) =>
- val leftFilter = translate(left, or)
- val rightFilter = translate(right, or)
- if (or) {
- if (leftFilter.isDefined && rightFilter.isDefined) {
- (leftFilter ++ rightFilter).reduceOption(sources.And)
- } else {
- None
- }
- } else {
- (leftFilter ++ rightFilter).reduceOption(sources.And)
- }
-
- case EqualTo(a: Attribute, Literal(v, t)) =>
- Some(sources.EqualTo(a.name, v))
- case EqualTo(l@Literal(v, t), a: Attribute) =>
- Some(sources.EqualTo(a.name, v))
- case Not(EqualTo(a: Attribute, Literal(v, t))) =>
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), a: Attribute)) =>
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case IsNotNull(a: Attribute) =>
- Some(sources.IsNotNull(a.name))
- case IsNull(a: Attribute) =>
- Some(sources.IsNull(a.name))
- case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.Not(sources.In(a.name, hSet.toArray)))
- case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.In(a.name, hSet.toArray))
- case GreaterThan(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThan(a.name, v))
- case GreaterThan(Literal(v, t), a: Attribute) =>
- Some(sources.LessThan(a.name, v))
- case LessThan(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThan(a.name, v))
- case LessThan(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThan(a.name, v))
- case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case LessThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case StartsWith(a: Attribute, Literal(v, t)) =>
- Some(sources.StringStartsWith(a.name, v.toString))
-
- case others =>
- if (!or) {
- others.collect {
- case attr: AttributeReference =>
- attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- None
- }
- }
- filters.flatMap(translate(_, false)).toArray
- }
-
- def isCarbonSupportedDataTypes(expr: Expression): Boolean = {
- expr.dataType match {
- case StringType => true
- case IntegerType => true
- case LongType => true
- case DoubleType => true
- case FloatType => true
- case BooleanType => true
- case TimestampType => true
- case ArrayType(_, _) => true
- case StructType(_) => true
- case DecimalType() => true
- case _ => false
- }
- }
-
- def processExpression(exprs: Seq[Expression],
- attributesNeedToDecode: java.util.HashSet[AttributeReference],
- unprocessedExprs: ArrayBuffer[Expression],
- carbonTable: CarbonTable): Option[CarbonExpression] = {
- def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
- expr match {
- case orFilter@ Or(left, right)
- if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
- val leftFilter = transformExpression(left, or = true)
- val rightFilter = transformExpression(right, or = true)
- if (leftFilter.isDefined && rightFilter.isDefined) {
- Some(new OrExpression(leftFilter.get, rightFilter.get))
- } else {
- if (!or) {
- orFilter.collect {
- case attr: AttributeReference => attributesNeedToDecode.add(attr)
- }
- unprocessedExprs += orFilter
- }
- None
- }
-
- case And(left, right) if (isCarbonSupportedDataTypes(left) &&
- isCarbonSupportedDataTypes(right)) =>
- val leftFilter = transformExpression(left, or)
- val rightFilter = transformExpression(right, or)
- if (or) {
- if (leftFilter.isDefined && rightFilter.isDefined) {
- (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _))
- } else {
- None
- }
- } else {
- (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _))
- }
-
-
- case EqualTo(a: Attribute, l@Literal(v, t)) if (isCarbonSupportedDataTypes(a) &&
- isCarbonSupportedDataTypes(l)) =>
- Some(
- new EqualToExpression(
- transformExpression(a).get,
- transformExpression(l).get
- )
- )
- case EqualTo(l@Literal(v, t), a: Attribute) if (isCarbonSupportedDataTypes(l) &&
- isCarbonSupportedDataTypes(a)) =>
- Some(
- new EqualToExpression(
- transformExpression(a).get,
- transformExpression(l).get
- )
- )
-
- case Not(EqualTo(a: Attribute, l@Literal(v, t))) if (isCarbonSupportedDataTypes(a) &&
- isCarbonSupportedDataTypes(l)) =>
- Some(
- new NotEqualsExpression(
- transformExpression(a).get,
- transformExpression(l).get
- )
- )
- case Not(EqualTo(l@Literal(v, t), a: Attribute)) if (isCarbonSupportedDataTypes(l) &&
- isCarbonSupportedDataTypes(a)) =>
- Some(
- new NotEqualsExpression(
- transformExpression(a).get,
- transformExpression(l).get
- )
- )
- case IsNotNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
- Some(new NotEqualsExpression(transformExpression(child).get,
- transformExpression(Literal(null)).get, true))
- case IsNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
- Some(new EqualToExpression(transformExpression(child).get,
- transformExpression(Literal(null)).get, true))
- case Not(In(a: Attribute, list))
- if !list.exists(!_.isInstanceOf[Literal]) && isCarbonSupportedDataTypes(a) =>
- if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
- Some(new FalseExpression(transformExpression(a).get))
- } else {
- Some(new NotInExpression(transformExpression(a).get,
- new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
- }
- case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) &&
- isCarbonSupportedDataTypes(a) =>
- Some(new InExpression(transformExpression(a).get,
- new ListExpression(convertToJavaList(list
- .map(transformExpression(_).get)))))
-
- case GreaterThan(a: Attribute, l@Literal(v, t))
- if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
- case GreaterThan(l@Literal(v, t), a: Attribute)
- if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-
- case LessThan(a: Attribute, l@Literal(v, t))
- if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
- case LessThan(l@Literal(v, t), a: Attribute)
- if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-
- case GreaterThanOrEqual(a: Attribute, l@Literal(v, t))
- if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case GreaterThanOrEqual(l@Literal(v, t), a: Attribute)
- if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
-
- case LessThanOrEqual(a: Attribute, l@Literal(v, t))
- if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case LessThanOrEqual(l@Literal(v, t), a: Attribute)
- if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
-
- case AttributeReference(name, dataType, _, _) =>
- Some(new CarbonColumnExpression(name,
- CarbonScalaUtil.convertSparkToCarbonDataType(
- getActualCarbonDataType(name, carbonTable))))
- case Literal(name, dataType) => Some(new
- CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
- case StartsWith(left : Attribute, right@Literal(pattern, dataType)) if
- pattern.toString.size > 0 &&
- isCarbonSupportedDataTypes
- (left) &&
- isCarbonSupportedDataTypes
- (right) =>
- val l = new GreaterThanEqualToExpression(transformExpression(left).get,
- transformExpression(right).get)
- val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) +
- (pattern.toString.charAt(pattern.toString.length - 1).toInt + 1)
- .toChar
- val r = new LessThanExpression(
- transformExpression(left).get,
- new CarbonLiteralExpression(maxValueLimit,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
- Some(new AndExpression(l, r))
- case others =>
- if (!or) {
- others.collect {
- case attr: AttributeReference => attributesNeedToDecode.add(attr)
- }
- unprocessedExprs += others
- }
- None
- }
- }
- exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
- }
- private def isNullLiteral(exp: Expression): Boolean = {
- if (null != exp
- && exp.isInstanceOf[Literal]
- && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
- || (exp.asInstanceOf[Literal].value == null)) {
- true
- } else {
- false
- }
- }
- private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
- var carbonColumn: CarbonColumn =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
- val dataType = if (carbonColumn != null) {
- carbonColumn.getDataType
- } else {
- carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
- carbonColumn.getDataType match {
- case CarbonDataTypes.INT => CarbonDataTypes.INT
- case CarbonDataTypes.SHORT => CarbonDataTypes.SHORT
- case CarbonDataTypes.LONG => CarbonDataTypes.LONG
- case CarbonDataTypes.DECIMAL => CarbonDataTypes.DECIMAL
- case _ => CarbonDataTypes.DOUBLE
- }
- }
- CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
- }
-
- // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
- // not able find the classes inside scala list and gives ClassNotFoundException.
- private def convertToJavaList(
- scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
- val javaList = new java.util.ArrayList[CarbonExpression]()
- scalaList.foreach(javaList.add)
- javaList
- }
-}