You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:07:54 UTC
[07/50] [abbrv] carbondata git commit: [CARBONDATA-1151] Refactor all
carbon command to separate file in spark2 integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 24b2981..9f4a8ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -21,11 +21,14 @@ import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable}
-import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
+import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
+import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -45,10 +48,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
initLexical
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) => plan match {
- case x: LoadTable =>
+ case x: LoadTableCommand =>
x.inputSqlString = input
x
- case x: AlterTableCompaction =>
+ case x: AlterTableCompactionCommand =>
x.alterTableModel.alterSql = input
x
case logicalPlan => logicalPlan
@@ -80,7 +83,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case dbName ~ table ~ addInfo =>
val alterTableAddPartitionModel =
AlterTableSplitPartitionModel(dbName, table, "0", addInfo)
- AlterTableSplitPartitionCommand(alterTableAddPartitionModel)
+ AlterTableSplitCarbonPartitionCommand(alterTableAddPartitionModel)
}
protected lazy val alterSplitPartition: Parser[LogicalPlan] =
@@ -92,7 +95,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
if (partitionId == 0) {
sys.error("Please use [Alter Table Add Partition] statement to split default partition!")
}
- AlterTableSplitPartitionCommand(alterTableSplitPartitionModel)
+ AlterTableSplitCarbonPartitionCommand(alterTableSplitPartitionModel)
}
protected lazy val alterDropPartition: Parser[LogicalPlan] =
@@ -105,7 +108,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
val alterTableDropPartitionModel =
AlterTableDropPartitionModel(dbName, table, partitionId, dropWithData)
- AlterTableDropPartition(alterTableDropPartitionModel)
+ AlterTableDropCarbonPartitionCommand(alterTableDropPartitionModel)
}
@@ -115,7 +118,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val altertablemodel =
AlterTableModel(convertDbNameToLowerCase(dbName), table, None, compactType,
Some(System.currentTimeMillis()), null)
- AlterTableCompaction(altertablemodel)
+ AlterTableCompactionCommand(altertablemodel)
}
protected lazy val deleteRecords: Parser[LogicalPlan] =
@@ -253,7 +256,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
validateOptions(optionsList)
}
val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
- LoadTable(convertDbNameToLowerCase(databaseNameOp), tableName, filePath, Seq(), optionsMap,
+ LoadTableCommand(
+ convertDbNameToLowerCase(databaseNameOp),
+ tableName,
+ filePath,
+ Seq(),
+ optionsMap,
isOverwrite.isDefined)
}
@@ -262,7 +270,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
(WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~
opt(";") ^^ {
case dbName ~ tableName ~ loadids =>
- DeleteLoadsById(loadids, dbName, tableName.toLowerCase())
+ DeleteLoadByIdCommand(loadids, dbName, tableName.toLowerCase())
}
protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
@@ -272,7 +280,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case database ~ table ~ condition =>
condition match {
case dateField ~ dateValue =>
- DeleteLoadsByLoadDate(convertDbNameToLowerCase(database),
+ DeleteLoadByLoadDateCommand(convertDbNameToLowerCase(database),
table.toLowerCase(),
dateField,
dateValue)
@@ -282,14 +290,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val cleanFiles: Parser[LogicalPlan] =
CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
case databaseName ~ tableName =>
- CleanFiles(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
+ CleanFilesCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
}
protected lazy val explainPlan: Parser[LogicalPlan] =
(EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
case isExtended ~ logicalPlan =>
logicalPlan match {
- case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+ case _: CarbonCreateTableCommand =>
+ ExplainCommand(logicalPlan, extended = isExtended.isDefined)
case _ => ExplainCommand(OneRowRelation)
}
}
@@ -317,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
table.toLowerCase,
columnName.toLowerCase,
columnNameCopy.toLowerCase)
- AlterTableDataTypeChange(alterTableChangeDataTypeModel)
+ AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
}
protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
@@ -386,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
tableModel.dimCols,
tableModel.msrCols,
tableModel.highcardinalitydims.getOrElse(Seq.empty))
- AlterTableAddColumns(alterTableAddColumnsModel)
+ AlterTableAddColumnCommand(alterTableAddColumnsModel)
}
private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
@@ -410,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
table.toLowerCase,
values.map(_.toLowerCase))
- AlterTableDropColumns(alterTableDropColumnModel)
+ AlterTableDropColumnCommand(alterTableDropColumnModel)
}
def getFields(schema: Seq[StructField]): Seq[Field] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index b4389a6..52008f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
@@ -146,7 +146,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
tableProperties,
bucketFields)
- CreateTable(tableModel)
+ CarbonCreateTableCommand(tableModel)
} else {
super.visitCreateTable(ctx)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index d00cb84..91121ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -17,7 +17,8 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel}
+import org.apache.spark.sql.execution.command.AlterTableModel
+import org.apache.spark.sql.execution.command.management.AlterTableCompactionCommand
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.processing.merger.CompactionType
@@ -33,7 +34,7 @@ object Compaction {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
if (compactionType.equalsIgnoreCase(CarbonCommonConstants.MAJOR) ||
compactionType.equalsIgnoreCase(CarbonCommonConstants.MINOR)) {
- AlterTableCompaction(AlterTableModel(Some(dbName),
+ AlterTableCompactionCommand(AlterTableModel(Some(dbName),
tableName,
None,
compactionType,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 72c7426..501402b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -24,7 +24,7 @@ import scala.collection.{immutable, mutable}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -61,7 +61,7 @@ object TableLoader {
def loadTable(spark: SparkSession, dbName: Option[String], tableName: String, inputPaths: String,
options: scala.collection.immutable.Map[String, String]): Unit = {
- LoadTable(dbName, tableName, inputPaths, Nil, options, false).run(spark)
+ LoadTableCommand(dbName, tableName, inputPaths, Nil, options, false).run(spark)
}
def main(args: Array[String]): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index db62eb5..d53065f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.spark.carbondata.vectorreader
import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
import org.apache.spark.sql.execution.{BatchedDataSourceScanExec, RowDataSourceScanExec}
import org.scalatest.BeforeAndAfterAll