You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:07:54 UTC

[07/50] [abbrv] carbondata git commit: [CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 24b2981..9f4a8ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -21,11 +21,14 @@ import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable}
-import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
+import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
+import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
 import org.apache.spark.sql.types.StructField
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -45,10 +48,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       initLexical
       phrase(start)(new lexical.Scanner(input)) match {
         case Success(plan, _) => plan match {
-          case x: LoadTable =>
+          case x: LoadTableCommand =>
             x.inputSqlString = input
             x
-          case x: AlterTableCompaction =>
+          case x: AlterTableCompactionCommand =>
             x.alterTableModel.alterSql = input
             x
           case logicalPlan => logicalPlan
@@ -80,7 +83,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       case dbName ~ table ~ addInfo =>
         val alterTableAddPartitionModel =
           AlterTableSplitPartitionModel(dbName, table, "0", addInfo)
-        AlterTableSplitPartitionCommand(alterTableAddPartitionModel)
+        AlterTableSplitCarbonPartitionCommand(alterTableAddPartitionModel)
     }
 
   protected lazy val alterSplitPartition: Parser[LogicalPlan] =
@@ -92,7 +95,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         if (partitionId == 0) {
           sys.error("Please use [Alter Table Add Partition] statement to split default partition!")
         }
-        AlterTableSplitPartitionCommand(alterTableSplitPartitionModel)
+        AlterTableSplitCarbonPartitionCommand(alterTableSplitPartitionModel)
     }
 
   protected lazy val alterDropPartition: Parser[LogicalPlan] =
@@ -105,7 +108,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
         val alterTableDropPartitionModel =
           AlterTableDropPartitionModel(dbName, table, partitionId, dropWithData)
-        AlterTableDropPartition(alterTableDropPartitionModel)
+        AlterTableDropCarbonPartitionCommand(alterTableDropPartitionModel)
     }
 
 
@@ -115,7 +118,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         val altertablemodel =
           AlterTableModel(convertDbNameToLowerCase(dbName), table, None, compactType,
           Some(System.currentTimeMillis()), null)
-        AlterTableCompaction(altertablemodel)
+        AlterTableCompactionCommand(altertablemodel)
     }
 
   protected lazy val deleteRecords: Parser[LogicalPlan] =
@@ -253,7 +256,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           validateOptions(optionsList)
         }
         val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
-        LoadTable(convertDbNameToLowerCase(databaseNameOp), tableName, filePath, Seq(), optionsMap,
+        LoadTableCommand(
+          convertDbNameToLowerCase(databaseNameOp),
+          tableName,
+          filePath,
+          Seq(),
+          optionsMap,
           isOverwrite.isDefined)
     }
 
@@ -262,7 +270,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~
     opt(";") ^^ {
       case dbName ~ tableName ~ loadids =>
-        DeleteLoadsById(loadids, dbName, tableName.toLowerCase())
+        DeleteLoadByIdCommand(loadids, dbName, tableName.toLowerCase())
     }
 
   protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
@@ -272,7 +280,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       case database ~ table ~ condition =>
         condition match {
           case dateField ~ dateValue =>
-            DeleteLoadsByLoadDate(convertDbNameToLowerCase(database),
+            DeleteLoadByLoadDateCommand(convertDbNameToLowerCase(database),
               table.toLowerCase(),
               dateField,
               dateValue)
@@ -282,14 +290,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val cleanFiles: Parser[LogicalPlan] =
     CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
       case databaseName ~ tableName =>
-        CleanFiles(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
+        CleanFilesCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase())
     }
 
   protected lazy val explainPlan: Parser[LogicalPlan] =
     (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
       case isExtended ~ logicalPlan =>
         logicalPlan match {
-          case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+          case _: CarbonCreateTableCommand =>
+            ExplainCommand(logicalPlan, extended = isExtended.isDefined)
           case _ => ExplainCommand(OneRowRelation)
         }
     }
@@ -317,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             table.toLowerCase,
             columnName.toLowerCase,
             columnNameCopy.toLowerCase)
-        AlterTableDataTypeChange(alterTableChangeDataTypeModel)
+        AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
     }
 
   protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
@@ -386,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           tableModel.dimCols,
           tableModel.msrCols,
           tableModel.highcardinalitydims.getOrElse(Seq.empty))
-        AlterTableAddColumns(alterTableAddColumnsModel)
+        AlterTableAddColumnCommand(alterTableAddColumnsModel)
     }
 
   private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
@@ -410,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
           table.toLowerCase,
           values.map(_.toLowerCase))
-        AlterTableDropColumns(alterTableDropColumnModel)
+        AlterTableDropColumnCommand(alterTableDropColumnModel)
     }
 
   def getFields(schema: Seq[StructField]): Seq[Field] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index b4389a6..52008f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
 import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
@@ -146,7 +146,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         tableProperties,
         bucketFields)
 
-      CreateTable(tableModel)
+      CarbonCreateTableCommand(tableModel)
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index d00cb84..91121ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel}
+import org.apache.spark.sql.execution.command.AlterTableModel
+import org.apache.spark.sql.execution.command.management.AlterTableCompactionCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.processing.merger.CompactionType
@@ -33,7 +34,7 @@ object Compaction {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
     if (compactionType.equalsIgnoreCase(CarbonCommonConstants.MAJOR) ||
         compactionType.equalsIgnoreCase(CarbonCommonConstants.MINOR)) {
-      AlterTableCompaction(AlterTableModel(Some(dbName),
+      AlterTableCompactionCommand(AlterTableModel(Some(dbName),
         tableName,
         None,
         compactionType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 72c7426..501402b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -24,7 +24,7 @@ import scala.collection.{immutable, mutable}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -61,7 +61,7 @@ object TableLoader {
 
   def loadTable(spark: SparkSession, dbName: Option[String], tableName: String, inputPaths: String,
       options: scala.collection.immutable.Map[String, String]): Unit = {
-    LoadTable(dbName, tableName, inputPaths, Nil, options, false).run(spark)
+    LoadTableCommand(dbName, tableName, inputPaths, Nil, options, false).run(spark)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index db62eb5..d53065f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.spark.carbondata.vectorreader
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
 import org.apache.spark.sql.execution.{BatchedDataSourceScanExec, RowDataSourceScanExec}
 import org.scalatest.BeforeAndAfterAll