You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:12 UTC
[4/8] carbondata git commit: [CARBONDATA-2532][Integration] Carbon to
support spark 2.3.1 version(Make API changes in carbon to be compatible with
spark 2.3)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
deleted file mode 100644
index dfb89fd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.CarbonReflectionUtils
-
-class CarbonAnalyzer(catalog: SessionCatalog,
- conf: SQLConf,
- sparkSession: SparkSession,
- analyzer: Analyzer) extends Analyzer(catalog, conf) {
-
- val mvPlan = try {
- CarbonReflectionUtils.createObject(
- "org.apache.carbondata.mv.datamap.MVAnalyzerRule",
- sparkSession)._1.asInstanceOf[Rule[LogicalPlan]]
- } catch {
- case e: Exception =>
- null
- }
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- var logicalPlan = analyzer.execute(plan)
- logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
- logicalPlan = CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
- if (mvPlan != null) {
- mvPlan.apply(logicalPlan)
- } else {
- logicalPlan
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
deleted file mode 100644
index ba6aae5..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class InMemorySessionCatalog(
- externalCatalog: ExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- functionRegistry: FunctionRegistry,
- sparkSession: SparkSession,
- conf: SQLConf,
- hadoopConf: Configuration,
- parser: ParserInterface,
- functionResourceLoader: FunctionResourceLoader)
- extends SessionCatalog(
- externalCatalog,
- globalTempViewManager,
- functionRegistry,
- conf,
- hadoopConf,
- parser,
- functionResourceLoader
- ) with CarbonSessionCatalog {
-
- override def alterTableRename(oldTableIdentifier: TableIdentifier,
- newTableIdentifier: TableIdentifier,
- newTablePath: String): Unit = {
- sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
- }
-
- override def alterTable(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- // NOt Required in case of In-memory catalog
- }
-
- override def alterAddColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
- val structType = catalogTable.schema
- var newStructType = structType
- newColumns.get.foreach {cols =>
- newStructType = structType
- .add(cols.getColumnName,
- CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(cols.getDataType))
- }
- alterSchema(newStructType, catalogTable, tableIdentifier)
- }
-
- override def alterDropColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
- val fields = catalogTable.schema.fields.filterNot { field =>
- dropCols.get.exists { col =>
- col.getColumnName.equalsIgnoreCase(field.name)
- }
- }
- alterSchema(new StructType(fields), catalogTable, tableIdentifier)
- }
-
- override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
- schemaParts: String,
- columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
- val a = catalogTable.schema.fields.flatMap { field =>
- columns.get.map { col =>
- if (col.getColumnName.equalsIgnoreCase(field.name)) {
- StructField(col.getColumnName,
- CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(col.getDataType))
- } else {
- field
- }
- }
- }
- alterSchema(new StructType(a), catalogTable, tableIdentifier)
- }
-
- private def alterSchema(structType: StructType,
- catalogTable: CatalogTable,
- tableIdentifier: TableIdentifier): Unit = {
- val copy = catalogTable.copy(schema = structType)
- sparkSession.sessionState.catalog.alterTable(copy)
- sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
- }
-
- lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
-
- def getCarbonEnv() : CarbonEnv = {
- carbonEnv
- }
-
- // Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
-
- def getThriftTableInfo(tablePath: String): TableInfo = {
- val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
- CarbonUtil.readSchemaFile(tableMetadataFile)
- }
-
- override def lookupRelation(name: TableIdentifier): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name)
- val isRelationRefreshed =
- CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
- if (isRelationRefreshed) {
- super.lookupRelation(name)
- } else {
- rtnRelation
- }
- }
-
- /**
- * returns hive client from HiveExternalCatalog
- *
- * @return
- */
- def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- null
- }
-
- override def createPartitions(
- tableName: TableIdentifier,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- try {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } catch {
- case e: Exception =>
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- override def getPartitionsAlternate(partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier) = {
- CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
- }
-
- /**
- * Update the storageformat with new location information
- */
- override def updateStorageLocation(
- path: Path,
- storage: CatalogStorageFormat,
- newTableName: String,
- dbName: String): CatalogStorageFormat = {
- storage.copy(locationUri = Some(path.toUri))
- }
-}
-
-class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
- parentState: Option[SessionState] = None)
- extends SessionStateBuilder(sparkSession, parentState) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies =
- Seq(new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
- new CarbonUDFTransformRule,
- new CarbonLateDecodeRule)
-
- /**
- * Internal catalog for managing table and database states.
- */
- override protected lazy val catalog: InMemorySessionCatalog = {
- val catalog = new InMemorySessionCatalog(
- externalCatalog,
- session.sharedState.globalTempViewManager,
- functionRegistry,
- sparkSession,
- conf,
- SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
- sqlParser,
- resourceLoader)
- parentState.foreach(_.catalog.copyStateTo(catalog))
- catalog
- }
-
- private def externalCatalog: ExternalCatalog =
- session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
-
- override protected lazy val resourceLoader: SessionResourceLoader = {
- new SessionResourceLoader(session)
- }
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
- new Analyzer(catalog, conf) {
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new FindDataSourceTable(session) +:
- new ResolveSQLOnFile(session) +:
- new CarbonIUDAnalysisRule(sparkSession) +:
- new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
- override val extendedCheckRules: Seq[LogicalPlan => Unit] =
- PreWriteCheck :: HiveOnlyCheck :: Nil
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- PreprocessTableCreation(session) +:
- PreprocessTableInsertion(conf) +:
- DataSourceAnalysis(conf) +:
- customPostHocResolutionRules
- }
- )
- override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
index a046763..2f9ad3e 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -17,12 +17,10 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
+import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
@@ -37,41 +35,3 @@ class CarbonOptimizer(
super.execute(transFormedPlan)
}
}
-
-object CarbonOptimizerUtil {
- def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
- // optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case e: Exists =>
- val tPlan = e.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
- case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
- val tPlan = sub.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- In(value, Seq(ListQuery(tPlan, l.children, exprId)))
- }
- }
- transFormedPlan
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index a7a255e..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
-import org.apache.spark.sql.catalyst.parser.{ParserInterface, SqlBaseParser}
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
- externalCatalog: HiveExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- functionRegistry: FunctionRegistry,
- sparkSession: SparkSession,
- conf: SQLConf,
- hadoopConf: Configuration,
- parser: ParserInterface,
- functionResourceLoader: FunctionResourceLoader)
- extends HiveSessionCatalog (
- externalCatalog,
- globalTempViewManager,
- new HiveMetastoreCatalog(sparkSession),
- functionRegistry,
- conf,
- hadoopConf,
- parser,
- functionResourceLoader
- ) with CarbonSessionCatalog {
-
- private lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
- /**
- * return's the carbonEnv instance
- * @return
- */
- override def getCarbonEnv() : CarbonEnv = {
- carbonEnv
- }
-
- // Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
-
- override def lookupRelation(name: TableIdentifier): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name)
- val isRelationRefreshed =
- CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
- if (isRelationRefreshed) {
- super.lookupRelation(name)
- } else {
- rtnRelation
- }
- }
-
- /**
- * returns hive client from HiveExternalCatalog
- *
- * @return
- */
- override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
- }
-
- def alterTableRename(oldTableIdentifier: TableIdentifier,
- newTableIdentifier: TableIdentifier,
- newTablePath: String): Unit = {
- getClient().runSqlHive(
- s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
- s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
- getClient().runSqlHive(
- s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
- s"SET SERDEPROPERTIES" +
- s"('tableName'='${ newTableIdentifier.table }', " +
- s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
- }
-
- override def alterTable(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- getClient()
- .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
- s"SET TBLPROPERTIES(${ schemaParts })")
- }
-
- override def alterAddColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- override def alterDropColumns(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
- schemaParts: String,
- cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
- : Unit = {
- alterTable(tableIdentifier, schemaParts, cols)
- }
-
- override def createPartitions(
- tableName: TableIdentifier,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- try {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } catch {
- case e: Exception =>
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- override def getPartitionsAlternate(partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier) = {
- CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
- }
-
- /**
- * Update the storageformat with new location information
- */
- override def updateStorageLocation(
- path: Path,
- storage: CatalogStorageFormat,
- newTableName: String,
- dbName: String): CatalogStorageFormat = {
- storage.copy(locationUri = Some(path.toUri))
- }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
- parentState: Option[SessionState] = None)
- extends HiveSessionStateBuilder(sparkSession, parentState) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies =
- Seq(new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
- new CarbonUDFTransformRule,
- new CarbonLateDecodeRule)
-
- /**
- * Internal catalog for managing table and database states.
- */
- /**
- * Create a [[CarbonSessionStateBuilder]].
- */
- override protected lazy val catalog: CarbonHiveSessionCatalog = {
- val catalog = new CarbonHiveSessionCatalog(
- externalCatalog,
- session.sharedState.globalTempViewManager,
- functionRegistry,
- sparkSession,
- conf,
- SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
- sqlParser,
- resourceLoader)
- parentState.foreach(_.catalog.copyStateTo(catalog))
- catalog
- }
-
- private def externalCatalog: HiveExternalCatalog =
- session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
- /**
- * Create a Hive aware resource loader.
- */
- override protected lazy val resourceLoader: HiveSessionResourceLoader = {
- val client: HiveClient = externalCatalog.client.newSession()
- new HiveSessionResourceLoader(session, client)
- }
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
- new Analyzer(catalog, conf) {
-
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new ResolveHiveSerdeTable(session) +:
- new FindDataSourceTable(session) +:
- new ResolveSQLOnFile(session) +:
- new CarbonIUDAnalysisRule(sparkSession) +:
- new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-
- override val extendedCheckRules: Seq[LogicalPlan => Unit] =
- PreWriteCheck :: HiveOnlyCheck :: Nil
-
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- new DetermineTableStats(session) +:
- RelationConversions(conf, catalog) +:
- PreprocessTableCreation(session) +:
- PreprocessTableInsertion(conf) +:
- DataSourceAnalysis(conf) +:
- HiveAnalysis +:
- customPostHocResolutionRules
- }
- )
-
- override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
deleted file mode 100644
index ac702ea..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
-import org.apache.spark.util.CarbonReflectionUtils
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-
-/**
- * This class refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table.
- */
-object CarbonSessionUtil {
-
- val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
-
- /**
- * The method refreshes the cache entry
- *
- * @param rtnRelation [[LogicalPlan]] represents the given table or view.
- * @param name tableName
- * @param sparkSession
- * @return
- */
- def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
- (sparkSession: SparkSession): Boolean = {
- var isRelationRefreshed = false
- rtnRelation match {
- case SubqueryAlias(_,
- LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
- ) =>
- isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
- case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
- isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
- case SubqueryAlias(_, relation) if
- relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
- relation.getClass.getName
- .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
- relation.getClass.getName.equals(
- "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
- ) =>
- val catalogTable =
- CarbonReflectionUtils.getFieldOfCatalogTable(
- "tableMeta",
- relation
- ).asInstanceOf[CatalogTable]
- isRelationRefreshed =
- CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
- case _ =>
- }
- isRelationRefreshed
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- *
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- def prunePartitionsByFilter(partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier): Seq[CatalogTablePartition] = {
- val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
- ExternalCatalogUtils.prunePartitionsByFilter(
- sparkSession.sessionState.catalog.getTableMetadata(identifier),
- allPartitions,
- partitionFilters,
- sparkSession.sessionState.conf.sessionLocalTimeZone
- )
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
index b8bf56d..7177f1a 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -18,24 +18,15 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.types.DecimalType
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) {
+ extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
@@ -55,75 +46,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
}
}
- override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
- val newColumn = visitColType(ctx.colType)
- if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
- throw new MalformedCarbonCommandException(
- "Column names provided are different. Both the column names should be same")
- }
-
- val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
- case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
- case _ => (newColumn.dataType.typeName.toLowerCase, None)
- }
-
- val alterTableChangeDataTypeModel =
- AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
- new CarbonSpark2SqlParser()
- .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
- ctx.tableIdentifier().table.getText.toLowerCase,
- ctx.identifier.getText.toLowerCase,
- newColumn.name.toLowerCase)
-
- CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
- }
-
-
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
- val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
- val fields = parser.getFields(cols)
- val tblProperties = scala.collection.mutable.Map.empty[String, String]
- val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
- new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
- .map(_.getText)),
- ctx.tableIdentifier.table.getText.toLowerCase,
- fields,
- Seq.empty,
- tblProperties,
- None,
- true)
-
- val alterTableAddColumnsModel = AlterTableAddColumnsModel(
- Option(ctx.tableIdentifier().db).map(_.getText),
- ctx.tableIdentifier.table.getText,
- tblProperties.toMap,
- tableModel.dimCols,
- tableModel.msrCols,
- tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
- CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
- }
-
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- super.visitCreateTable(ctx)
- }
-
- override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
- withOrigin(ctx) {
- if (CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
- CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
- super.visitShowTables(ctx)
- } else {
- CarbonShowTablesCommand(
- Option(ctx.db).map(_.getText),
- Option(ctx.pattern).map(string))
- }
- }
- }
-
- override def visitExplain(ctx: SqlBaseParser.ExplainContext): LogicalPlan = {
- CarbonExplainCommand(super.visitExplain(ctx))
+ visitAddTableColumns(parser,ctx)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
deleted file mode 100644
index 2128ffd..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlConf.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.buildConf
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
- val carbonProperties = CarbonProperties.getInstance()
-
- /**
- * To initialize dynamic param defaults along with usage docs
- */
- def addDefaultCarbonParams(): Unit = {
- val ENABLE_UNSAFE_SORT =
- buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
- .doc("To enable/ disable unsafe sort.")
- .booleanConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
- buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
- .doc("To set carbon task distribution.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- val BAD_RECORDS_LOGGER_ENABLE =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
- .doc("To enable/ disable carbon bad record logger.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants
- .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- val BAD_RECORDS_ACTION =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- .doc("To configure the bad records action.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- val IS_EMPTY_DATA_BAD_RECORD =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
- .doc("Property to decide weather empty data to be considered bad/ good record.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
- .toBoolean)
- val SORT_SCOPE =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
- .doc("Property to specify sort scope.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val BATCH_SORT_SIZE_INMB =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
- .doc("Property to specify batch sort size in MB.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- val SINGLE_PASS =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
- .doc("Property to enable/disable single_pass.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- val BAD_RECORD_PATH =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
- .doc("Property to configure the bad record location.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- val GLOBAL_SORT_PARTITIONS =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
- .doc("Property to configure the global sort partitions.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- val DATEFORMAT =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- .doc("Property to configure data format for date type columns.")
- .stringConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- val CARBON_INPUT_SEGMENTS = buildConf(
- "carbon.input.segments.<database_name>.<table_name>")
- .doc("Property to configure the list of segments to query.").stringConf
- .createWithDefault(carbonProperties
- .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
- }
- /**
- * to set the dynamic properties default values
- */
- def addDefaultCarbonSessionParams(): Unit = {
- sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
deleted file mode 100644
index 4e22d13..0000000
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
-import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
-import org.apache.spark.sql.sources.BaseRelation
-
-/**
- * Create table 'using carbondata' and insert the query result into it.
- *
- * @param table the Catalog Table
- * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
- * @param query the query whose result will be insert into the new relation
- *
- */
-
-case class CreateCarbonSourceTableAsSelectCommand(
- table: CatalogTable,
- mode: SaveMode,
- query: LogicalPlan)
- extends RunnableCommand {
-
- override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- assert(table.tableType != CatalogTableType.VIEW)
- assert(table.provider.isDefined)
-
- val sessionState = sparkSession.sessionState
- val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val tableIdentWithDB = table.identifier.copy(database = Some(db))
- val tableName = tableIdentWithDB.unquotedString
-
- if (sessionState.catalog.tableExists(tableIdentWithDB)) {
- assert(mode != SaveMode.Overwrite,
- s"Expect the table $tableName has been dropped when the save mode is Overwrite")
-
- if (mode == SaveMode.ErrorIfExists) {
- throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
- }
- if (mode == SaveMode.Ignore) {
- // Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty
- }
-
- saveDataIntoTable(
- sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
- } else {
- assert(table.schema.isEmpty)
-
- val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
- Some(sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.locationUri
- }
- val result = saveDataIntoTable(
- sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
-
- result match {
- case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
- sparkSession.sqlContext.conf.manageFilesourcePartitions =>
- // Need to recover partitions into the metastore so our saved data is visible.
- sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
- case _ =>
- }
- }
-
- Seq.empty[Row]
- }
-
- private def saveDataIntoTable(
- session: SparkSession,
- table: CatalogTable,
- tableLocation: Option[URI],
- data: LogicalPlan,
- mode: SaveMode,
- tableExists: Boolean): BaseRelation = {
- // Create the relation based on the input logical plan: `data`.
- val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
- val dataSource = DataSource(
- session,
- className = table.provider.get,
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption,
- catalogTable = if (tableExists) {
- Some(table)
- } else {
- None
- })
-
- try {
- dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
- } catch {
- case ex: AnalysisException =>
- logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
- throw ex
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
new file mode 100644
index 0000000..cec4c36
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+object CarbonToSparkAdapater {
+
+ def addSparkListener(sparkContext: SparkContext) = {
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ }
+ })
+ }
+
+ def createAttributeReference(name: String, dataType: DataType, nullable: Boolean,
+ metadata: Metadata, exprId: ExprId, qualifier: Option[String],
+ attrRef : NamedExpression): AttributeReference = {
+ AttributeReference(
+ name,
+ dataType,
+ nullable,
+ metadata)(exprId, qualifier)
+ }
+
+ def createAliasRef(child: Expression,
+ name: String,
+ exprId: ExprId = NamedExpression.newExprId,
+ qualifier: Option[String] = None,
+ explicitMetadata: Option[Metadata] = None,
+ namedExpr : Option[NamedExpression] = None ) : Alias = {
+
+ Alias(child, name)(exprId, qualifier, explicitMetadata)
+ }
+
+ def getExplainCommandObj() : ExplainCommand = {
+ ExplainCommand(OneRowRelation())
+ }
+
+ /**
+ * As a part of SPARK-24085 Hive tables supports scala subquery for
+ * parition tables,so Carbon also needs to supports
+ * @param partitionSet
+ * @param filterPredicates
+ * @return
+ */
+ def getPartitionKeyFilter(
+ partitionSet: AttributeSet,
+ filterPredicates: Seq[Expression]): ExpressionSet = {
+ ExpressionSet(
+ ExpressionSet(filterPredicates)
+ .filterNot(SubqueryExpression.hasSubquery)
+ .filter(_.references.subsetOf(partitionSet)))
+ }
+
+ // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
+ def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
deleted file mode 100644
index 783a528..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonVectorProxy.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import java.math.BigInteger;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.ColumnVectorFactory;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- */
-public class CarbonVectorProxy {
-
- private ColumnarBatch columnarBatch;
- private WritableColumnVector[] writableColumnVectors;
-
- /**
- * Adapter class which handles the columnar vector reading of the carbondata
- * based on the spark ColumnVector and ColumnarBatch API. This proxy class
- * handles the complexity of spark 2.3 version related api changes since
- * spark ColumnVector and ColumnarBatch interfaces are still evolving.
- *
- * @param memMode which represent the type onheap or offheap vector.
- * @param rowNum rows number for vector reading
- * @param structFileds, metadata related to current schema of table.
- */
- public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
- writableColumnVectors = ColumnVectorFactory
- .getColumnVector(memMode, new StructType(structFileds), rowNum);
- columnarBatch = new ColumnarBatch(writableColumnVectors);
- columnarBatch.setNumRows(rowNum);
- }
-
- public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
- writableColumnVectors = ColumnVectorFactory
- .getColumnVector(memMode, outputSchema, rowNum);
- columnarBatch = new ColumnarBatch(writableColumnVectors);
- columnarBatch.setNumRows(rowNum);
- }
-
- /**
- * Returns the number of rows for read, including filtered rows.
- */
- public int numRows() {
- return columnarBatch.numRows();
- }
-
- public Object reserveDictionaryIds(int capacity, int ordinal) {
- return writableColumnVectors[ordinal].reserveDictionaryIds(capacity);
- }
-
- /**
- * This API will return a columnvector from a batch of column vector rows
- * based on the ordinal
- *
- * @param ordinal
- * @return
- */
- public ColumnVector column(int ordinal) {
- return columnarBatch.column(ordinal);
- }
-
- /**
- * Resets this column for writing. The currently stored values are no longer accessible.
- */
- public void reset() {
- for (WritableColumnVector col : writableColumnVectors) {
- col.reset();
- }
- }
-
- public void resetDictionaryIds(int ordinal) {
- writableColumnVectors[ordinal].getDictionaryIds().reset();
- }
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public InternalRow getRow(int rowId) {
- return columnarBatch.getRow(rowId);
- }
-
-
- /**
- * Returns the row in this batch at `rowId`. Returned row is reused across calls.
- */
- public Object getColumnarBatch() {
- return columnarBatch;
- }
-
- /**
- * Called to close all the columns in this batch. It is not valid to access the data after
- * calling this. This must be called at the end to clean up memory allocations.
- */
- public void close() {
- columnarBatch.close();
- }
-
- /**
- * Sets the number of rows in this batch.
- */
- public void setNumRows(int numRows) {
- columnarBatch.setNumRows(numRows);
- }
-
- /**
- * This method will add the row to the corresponding column vector object
- *
- * @param rowId
- * @param value
- */
- public void putRowToColumnBatch(int rowId, Object value, int offset) {
- org.apache.spark.sql.types.DataType t = dataType(offset);
- if (null == value) {
- putNull(rowId, offset);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- putBoolean(rowId, (boolean) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- putByte(rowId, (byte) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- putShort(rowId, (short) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- putInt(rowId, (int) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- putLong(rowId, (long) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- putFloat(rowId, (float) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- putDouble(rowId, (double) value, offset);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- putByteArray(rowId, v.getBytes(), offset);
- } else if (t instanceof DecimalType) {
- DecimalType dt = (DecimalType) t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- putInt(rowId, (int) d.toUnscaledLong(), offset);
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- putLong(rowId, d.toUnscaledLong(), offset);
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- putByteArray(rowId, bytes, 0, bytes.length, offset);
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- writableColumnVectors[offset].getChild(0).putInt(rowId, c.months);
- writableColumnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- putInt(rowId, (int) value, offset);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- putLong(rowId, (long) value, offset);
- }
- }
- }
-
- public void putBoolean(int rowId, boolean value, int ordinal) {
- writableColumnVectors[ordinal].putBoolean(rowId, (boolean) value);
- }
-
- public void putByte(int rowId, byte value, int ordinal) {
- writableColumnVectors[ordinal].putByte(rowId, (byte) value);
- }
-
- public void putShort(int rowId, short value, int ordinal) {
- writableColumnVectors[ordinal].putShort(rowId, (short) value);
- }
-
- public void putInt(int rowId, int value, int ordinal) {
- writableColumnVectors[ordinal].putInt(rowId, (int) value);
- }
-
- public void putFloat(int rowId, float value, int ordinal) {
- writableColumnVectors[ordinal].putFloat(rowId, (float) value);
- }
-
- public void putLong(int rowId, long value, int ordinal) {
- writableColumnVectors[ordinal].putLong(rowId, (long) value);
- }
-
- public void putDouble(int rowId, double value, int ordinal) {
- writableColumnVectors[ordinal].putDouble(rowId, (double) value);
- }
-
- public void putByteArray(int rowId, byte[] value, int ordinal) {
- writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value);
- }
-
- public void putInts(int rowId, int count, int value, int ordinal) {
- writableColumnVectors[ordinal].putInts(rowId, count, value);
- }
-
- public void putShorts(int rowId, int count, short value, int ordinal) {
- writableColumnVectors[ordinal].putShorts(rowId, count, value);
- }
-
- public void putLongs(int rowId, int count, long value, int ordinal) {
- writableColumnVectors[ordinal].putLongs(rowId, count, value);
- }
-
- public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
- writableColumnVectors[ordinal].putDecimal(rowId, value, precision);
-
- }
-
- public void putDoubles(int rowId, int count, double value, int ordinal) {
- writableColumnVectors[ordinal].putDoubles(rowId, count, value);
- }
-
- public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
- writableColumnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
- }
-
- public void putNull(int rowId, int ordinal) {
- writableColumnVectors[ordinal].putNull(rowId);
- }
-
- public void putNulls(int rowId, int count, int ordinal) {
- writableColumnVectors[ordinal].putNulls(rowId, count);
- }
-
- public boolean isNullAt(int rowId, int ordinal) {
- return writableColumnVectors[ordinal].isNullAt(rowId);
- }
-
- public DataType dataType(int ordinal) {
- return writableColumnVectors[ordinal].dataType();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
deleted file mode 100644
index 6fe5ede..0000000
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/ColumnVectorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-public class ColumnVectorFactory {
-
-
- public static WritableColumnVector[] getColumnVector(MemoryMode memMode, StructType outputSchema, int rowNums) {
-
-
- WritableColumnVector[] writableColumnVectors = null;
- switch (memMode) {
- case ON_HEAP:
- writableColumnVectors = OnHeapColumnVector
- .allocateColumns(rowNums, outputSchema);
- break;
- case OFF_HEAP:
- writableColumnVectors = OffHeapColumnVector
- .allocateColumns(rowNums, outputSchema);
- break;
- }
- return writableColumnVectors;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
new file mode 100644
index 0000000..f1b632b
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.internal.SQLConf
+
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+ super.execute(transFormedPlan)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
new file mode 100644
index 0000000..73b6790
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) with SqlAstBuilderHelper {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+ override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+ val fileStorage = CarbonSparkSqlParserUtil.getFileStorage(ctx.createFileFormat(0))
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("carbondata") ||
+ fileStorage.equalsIgnoreCase("'carbonfile'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec(0),
+ ctx.bucketSpec(0), ctx.partitionColumns, ctx.columns, ctx.tablePropertyList(0),ctx.locationSpec(0),
+ Option(ctx.STRING(0)).map(string), ctx.AS, ctx.query, fileStorage)
+ helper.createCarbonTable(createTableTuple)
+ } else {
+ super.visitCreateHiveTable(ctx)
+ }
+ }
+
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+ visitAddTableColumns(parser,ctx)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
new file mode 100644
index 0000000..f357e41
--- /dev/null
+++ b/integration/spark2/src/test/java/org/apache/carbondata/stream/CarbonStreamRecordReaderTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.streaming.CarbonStreamInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamRecordReaderTest extends TestCase {
+
+ private TaskAttemptID taskAttemptId;
+ private TaskAttemptContext taskAttemptContext;
+ private Configuration hadoopConf;
+ private AbsoluteTableIdentifier identifier;
+ private String tablePath;
+
+
+ @Override protected void setUp() throws Exception {
+ tablePath = new File("target/stream_input").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_input";
+ identifier = AbsoluteTableIdentifier.from(
+ tablePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ }
+
+ private InputSplit buildInputSplit() throws IOException {
+ CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+ List<CarbonInputSplit> splitList = new ArrayList<>();
+ splitList.add(carbonInputSplit);
+ return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
+ FileFormat.ROW_V1);
+ }
+
+ @Test public void testCreateRecordReader() {
+ try {
+ InputSplit inputSplit = buildInputSplit();
+ CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+ RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ Assert.assertNotNull("Failed to create record reader", recordReader);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
+ @Override protected void tearDown() throws Exception {
+ super.tearDown();
+ if (tablePath != null) {
+ FileFactory.deleteAllFilesOfDir(new File(tablePath));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
index cdcf018..466475b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesFilterTest.scala
@@ -80,7 +80,7 @@ class BooleanDataTypesFilterTest extends QueryTest with BeforeAndAfterEach with
checkAnswer(sql("select count(*) from carbon_table where booleanField = true"),
Row(4))
- if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+ if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
checkAnswer(sql("select count(*) from carbon_table where booleanField = 'true'"),
Row(0))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
index c220f1c..4050fd8 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesLoadTest.scala
@@ -283,7 +283,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
Row(10))
- if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+ if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
Row(0))
@@ -373,7 +373,7 @@ class BooleanDataTypesLoadTest extends QueryTest with BeforeAndAfterEach with Be
checkAnswer(sql("select count(*) from boolean_table where booleanField = false or booleanField = true"),
Row(10))
- if (!Spark2TestQueryExecutor.spark.version.startsWith("2.2")) {
+ if (Spark2TestQueryExecutor.spark.version.startsWith("2.1")) {
checkAnswer(sql("select count(*) from boolean_table where booleanField = 'true'"),
Row(0))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 363db65..8aa0cf6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.spark.carbondata.bucketing
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.Exchange
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -128,7 +128,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
- case s: ShuffleExchange => shuffleExists = true
+ case s: Exchange if (s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+ s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+ => shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
}
@@ -146,7 +150,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
- case s: ShuffleExchange => shuffleExists = true
+ case s: Exchange if (s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+ s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+ => shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
}
@@ -171,7 +179,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
- case s: ShuffleExchange => shuffleExists = true
+ case s: Exchange if (s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+ s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+ => shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
sql("DROP TABLE bucketed_parquet_table")
@@ -196,7 +208,11 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
- case s: ShuffleExchange => shuffleExists = true
+ case s: Exchange if (s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+ s.getClass.getName.equals
+ ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+ => shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
sql("DROP TABLE parquet_table")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index 66cf675..089dfd2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.carbondata.query
import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.scalatest.BeforeAndAfterAll