You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/27 01:19:08 UTC
[1/4] incubator-carbondata git commit: Initial commit
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 28190eb71 -> e8dcd4296
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
new file mode 100644
index 0000000..9a3f828
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.catalog.CatalogColumn
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Concrete parser for Spark SQL statements and carbon specific statements
+ */
+class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+
+ val astBuilder = new CarbonSqlAstBuilder(conf)
+
+ private val substitutor = new VariableSubstitution(conf)
+
+ protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+ super.parse(substitutor.substitute(command))(toResult)
+ }
+
+ override def parsePlan(sqlText: String): LogicalPlan = {
+ try {
+ super.parsePlan(sqlText)
+ } catch {
+ case e: Throwable =>
+ astBuilder.parser.parse(sqlText)
+ }
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+
+ val parser = new CarbonSpark2SqlParser
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ val fileStorage = Option(ctx.createFileFormat) match {
+ case Some(value) => value.storageHandler().STRING().getSymbol.getText
+ case _ => ""
+ }
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+ // TODO: implement temporary tables
+ if (temp) {
+ throw new ParseException(
+ "CREATE TEMPORARY TABLE is not supported yet. " +
+ "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
+ }
+ if (ctx.skewSpec != null) {
+ operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
+ }
+ if (ctx.bucketSpec != null) {
+ operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
+ }
+ val comment = Option(ctx.STRING).map(string)
+ val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+ val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+ .getOrElse(Map.empty)
+
+ // Ensuring whether no duplicate name is used in table definition
+ val colNames = cols.map(_.name)
+ if (colNames.length != colNames.distinct.length) {
+ val duplicateColumns = colNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }
+ operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
+ duplicateColumns.mkString("[", ",", "]"), ctx)
+ }
+
+ // For Hive tables, partition columns must not be part of the schema
+ val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
+ if (badPartCols.nonEmpty) {
+ operationNotAllowed(s"Partition columns may not be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+ }
+
+ // Note: Hive requires partition columns to be distinct from the schema, so we need
+ // to include the partition columns here explicitly
+ val schema = cols ++ partitionCols
+
+ val fields = schema.map { col =>
+ val x = col.name + ' ' + col.dataType
+ val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x))
+ match {
+ case parser.Success(field, _) => field.asInstanceOf[Field]
+ case failureOrError => throw new MalformedCarbonCommandException(
+ s"Unsupported data type: $col.getType")
+ }
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = parser.getScaleAndPrecision(col.dataType)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ if(f.dataType.getOrElse("").startsWith("char")) {
+ f.dataType = Some("char")
+ }
+ f.rawSchema = x
+ f
+ }
+
+ // validate tblProperties
+ if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
+ throw new MalformedCarbonCommandException("Invalid table properties")
+ }
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+ name.database,
+ name.table,
+ fields,
+ Seq(),
+ properties.asJava.asScala)
+
+ CreateTable(tableModel)
+ } else {
+ super.visitCreateTable(ctx)
+ }
+ }
+
+ /**
+ * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+ */
+ private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+ val props = visitTablePropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v == null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
+ }
+ props
+ }
+
+ private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = {
+ withOrigin(ctx) {
+ ctx.colType.asScala.map { col =>
+ CatalogColumn(
+ col.identifier.getText.toLowerCase,
+ // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
+ // just convert the whole type string to lower case, otherwise the struct field names
+ // will no longer be case sensitive. Instead, we rely on our parser to get the proper
+ // case before passing it to Hive.
+ typedVisit[DataType](col.dataType).catalogString,
+ nullable = true,
+ Option(col.STRING).map(string))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c84882e..399b3e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
cleanFiles(spark, dbName, tableName, storePath)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 1e891fd..2db6e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -41,7 +41,7 @@ object Compaction {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index ae95bf6..951cd7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index d5a6861..dad9f59 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 1a02c8c..c953089 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
None
}
val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
val rows = showSegments(spark, dbName, tableName, limit)
System.out.println(showString(rows))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 8b10aa4..424d8fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -86,7 +86,7 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 45dcb03..4310d04 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -54,6 +54,11 @@ class QueryTest extends PlanTest {
clean(metastoredb)
}
+ CarbonProperties.getInstance()
+ .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+ .addProperty("carbon.storelocation", storeLocation)
+
+ import org.apache.spark.sql.CarbonSession._
val spark = SparkSession
.builder()
.master("local")
@@ -62,17 +67,13 @@ class QueryTest extends PlanTest {
.config("spark.sql.warehouse.dir", warehouse)
.config("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastoredb;create=true")
- .getOrCreate()
-
- CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
- .addProperty("carbon.storelocation", storeLocation)
+ .getOrCreateCarbonSession()
spark.sparkContext.setLogLevel("WARN")
spark
}
- val sc = spark.sparkContext
+ val Dsc = spark.sparkContext
lazy val implicits = spark.implicits
[4/4] incubator-carbondata git commit: [CARBONDATA-547] Added
CarbonSession and enabled parser to use carbon custom commands This closes
#448
Posted by ja...@apache.org.
[CARBONDATA-547] Added CarbonSession and enabled parser to use carbon custom commands This closes #448
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e8dcd429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e8dcd429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e8dcd429
Branch: refs/heads/master
Commit: e8dcd4296b3eec453f100290afe39489edbd151f
Parents: 28190eb bedc96d
Author: jackylk <ja...@huawei.com>
Authored: Tue Dec 27 09:17:42 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 27 09:17:42 2016 +0800
----------------------------------------------------------------------
.../carbondata/examples/CarbonExample.scala | 173 ----
.../examples/CarbonSessionExample.scala | 144 +++
.../examples/SparkSessionExample.scala | 173 ++++
.../catalyst/AbstractCarbonSparkSQLParser.scala | 137 +++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 968 +++++++++++++++++++
.../execution/command/carbonTableSchema.scala | 8 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 938 +-----------------
.../execution/command/carbonTableSchema.scala | 2 +-
.../spark/sql/CarbonCatalystOperators.scala | 36 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 4 +-
.../org/apache/spark/sql/CarbonSession.scala | 133 +++
.../org/apache/spark/sql/CarbonSource.scala | 12 +-
.../execution/CarbonLateDecodeStrategy.scala | 6 +-
.../sql/execution/command/DDLStrategy.scala | 83 ++
.../execution/command/carbonTableSchema.scala | 319 +++++-
.../apache/spark/sql/hive/CarbonMetastore.scala | 9 +
.../spark/sql/hive/CarbonSessionState.scala | 38 +
.../sql/parser/CarbonSpark2SqlParser.scala | 125 +++
.../spark/sql/parser/CarbonSparkSqlParser.scala | 178 ++++
.../org/apache/spark/util/CleanFiles.scala | 2 +-
.../org/apache/spark/util/Compaction.scala | 2 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 2 +-
.../apache/spark/util/DeleteSegmentById.scala | 2 +-
.../org/apache/spark/util/ShowSegments.scala | 2 +-
.../org/apache/spark/util/TableLoader.scala | 2 +-
.../spark/carbondata/util/QueryTest.scala | 0
.../sql/common/util/CarbonSessionTest.scala | 0
.../spark/sql/common/util/QueryTest.scala | 13 +-
28 files changed, 2366 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-carbondata git commit: Initial commit
Posted by ja...@apache.org.
Initial commit
Added comments
Fixed style and testcases
Refactored code
Fixed issue
Rebased
Rebased
fixed comments
fixed style
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bedc96d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bedc96d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bedc96d0
Branch: refs/heads/master
Commit: bedc96d059fe25ebec298220b92243e87c496a0c
Parents: 28190eb
Author: ravipesala <ra...@gmail.com>
Authored: Mon Dec 19 18:57:11 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 27 08:48:59 2016 +0800
----------------------------------------------------------------------
.../carbondata/examples/CarbonExample.scala | 173 ----
.../examples/CarbonSessionExample.scala | 144 +++
.../examples/SparkSessionExample.scala | 173 ++++
.../catalyst/AbstractCarbonSparkSQLParser.scala | 137 +++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 968 +++++++++++++++++++
.../execution/command/carbonTableSchema.scala | 8 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 938 +-----------------
.../execution/command/carbonTableSchema.scala | 2 +-
.../spark/sql/CarbonCatalystOperators.scala | 36 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 4 +-
.../org/apache/spark/sql/CarbonSession.scala | 133 +++
.../org/apache/spark/sql/CarbonSource.scala | 12 +-
.../execution/CarbonLateDecodeStrategy.scala | 6 +-
.../sql/execution/command/DDLStrategy.scala | 83 ++
.../execution/command/carbonTableSchema.scala | 319 +++++-
.../apache/spark/sql/hive/CarbonMetastore.scala | 9 +
.../spark/sql/hive/CarbonSessionState.scala | 38 +
.../sql/parser/CarbonSpark2SqlParser.scala | 125 +++
.../spark/sql/parser/CarbonSparkSqlParser.scala | 178 ++++
.../org/apache/spark/util/CleanFiles.scala | 2 +-
.../org/apache/spark/util/Compaction.scala | 2 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 2 +-
.../apache/spark/util/DeleteSegmentById.scala | 2 +-
.../org/apache/spark/util/ShowSegments.scala | 2 +-
.../org/apache/spark/util/TableLoader.scala | 2 +-
.../spark/carbondata/util/QueryTest.scala | 0
.../sql/common/util/CarbonSessionTest.scala | 0
.../spark/sql/common/util/QueryTest.scala | 13 +-
28 files changed, 2366 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
deleted file mode 100644
index 273de95..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{CleanFiles, ShowSegments}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-object CarbonExample {
-
- def main(args: Array[String]): Unit = {
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
- val storeLocation = s"$rootPath/examples/spark2/target/store"
- val warehouse = s"$rootPath/examples/spark2/target/warehouse"
- val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
- // clean data folder
- if (true) {
- val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
- clean(storeLocation)
- clean(warehouse)
- clean(metastoredb)
- }
-
- val spark = SparkSession
- .builder()
- .master("local")
- .appName("CarbonExample")
- .enableHiveSupport()
- .config("spark.sql.warehouse.dir", warehouse)
- .config("javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$metastoredb;create=true")
- .getOrCreate()
-
- CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
- .addProperty("carbon.storelocation", storeLocation)
-
- spark.sparkContext.setLogLevel("WARN")
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
- // Create table
- spark.sql(
- s"""
- | CREATE TABLE carbon_table(
- | shortField short,
- | intField int,
- | bigintField long,
- | doubleField double,
- | stringField string,
- | timestampField timestamp,
- | decimalField decimal(18,2),
- | dateField date,
- | charField char(5)
- | )
- | USING org.apache.spark.sql.CarbonSource
- | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
- | 'dbName'='default', 'tableName'='carbon_table')
- """.stripMargin)
-
- // val prop = s"$rootPath/conf/dataload.properties.template"
- // val tableName = "carbon_table"
- val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
- // TableLoader.main(Array[String](prop, tableName, path))
-
- spark.sql(
- s"""
- | CREATE TABLE csv_table
- | ( shortField short,
- | intField int,
- | bigintField long,
- | doubleField double,
- | stringField string,
- | timestampField string,
- | decimalField decimal(18,2),
- | dateField string,
- | charField char(5))
- | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
- """.stripMargin)
-
- spark.sql(
- s"""
- | LOAD DATA LOCAL INPATH '$path'
- | INTO TABLE csv_table
- """.stripMargin)
-
- spark.sql("""
- SELECT *
- FROM csv_table
- """).show
-
- spark.sql(
- s"""
- | INSERT INTO TABLE carbon_table
- | SELECT shortField, intField, bigintField, doubleField, stringField,
- | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
- | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
- | FROM csv_table
- """.stripMargin)
-
- spark.sql("""
- SELECT *
- FROM carbon_table
- where stringfield = 'spark' and decimalField > 40
- """).show
-
- spark.sql("""
- SELECT *
- FROM carbon_table where length(stringField) = 5
- """).show
-
- spark.sql("""
- SELECT *
- FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
- """).show
-
- spark.sql("""
- select count(stringField) from carbon_table
- """.stripMargin).show
-
- spark.sql("""
- SELECT sum(intField), stringField
- FROM carbon_table
- GROUP BY stringField
- """).show
-
- spark.sql(
- """
- |select t1.*, t2.*
- |from carbon_table t1, carbon_table t2
- |where t1.stringField = t2.stringField
- """.stripMargin).show
-
- spark.sql(
- """
- |with t1 as (
- |select * from carbon_table
- |union all
- |select * from carbon_table
- |)
- |select t1.*, t2.*
- |from t1, carbon_table t2
- |where t1.stringField = t2.stringField
- """.stripMargin).show
-
- // Drop table
- spark.sql("DROP TABLE IF EXISTS carbon_table")
- spark.sql("DROP TABLE IF EXISTS csv_table")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
new file mode 100644
index 0000000..4923e5b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonSessionExample {
+
+ def main(args: Array[String]) {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+ // clean data folder
+ if (true) {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ clean(warehouse)
+ clean(metastoredb)
+ }
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+ .addProperty("carbon.storelocation", storeLocation)
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ import org.apache.spark.sql.CarbonSession._
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonExample")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir", warehouse)
+ .config("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastoredb;create=true")
+ .getOrCreateCarbonSession()
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ spark.sql("DROP TABLE IF EXISTS carbon_table")
+
+ // Create table
+ spark.sql(
+ s"""
+ | CREATE TABLE carbon_table(
+ | shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | timestampField timestamp,
+ | decimalField decimal(18,2),
+ | dateField date,
+ | charField char(5)
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+ """.stripMargin)
+
+ val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+ // scalastyle:off
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon_table
+ | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField')
+ """.stripMargin)
+ // scalastyle:on
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table
+ where stringfield = 'spark' and decimalField > 40
+ """).show
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table where length(stringField) = 5
+ """).show
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+ """).show
+
+ spark.sql("""
+ select count(stringField) from carbon_table
+ """.stripMargin).show
+
+ spark.sql("""
+ SELECT sum(intField), stringField
+ FROM carbon_table
+ GROUP BY stringField
+ """).show
+
+ spark.sql(
+ """
+ |select t1.*, t2.*
+ |from carbon_table t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
+
+ spark.sql(
+ """
+ |with t1 as (
+ |select * from carbon_table
+ |union all
+ |select * from carbon_table
+ |)
+ |select t1.*, t2.*
+ |from t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
+
+ // Drop table
+ spark.sql("DROP TABLE IF EXISTS carbon_table")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
new file mode 100644
index 0000000..2affbe2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{CleanFiles, ShowSegments}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object SparkSessionExample {
+
+ def main(args: Array[String]): Unit = {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+ // clean data folder
+ if (true) {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ clean(warehouse)
+ clean(metastoredb)
+ }
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonExample")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir", warehouse)
+ .config("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastoredb;create=true")
+ .getOrCreate()
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+ .addProperty("carbon.storelocation", storeLocation)
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // Create table
+ spark.sql(
+ s"""
+ | CREATE TABLE carbon_table(
+ | shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | timestampField timestamp,
+ | decimalField decimal(18,2),
+ | dateField date,
+ | charField char(5)
+ | )
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
+ | 'dbName'='default', 'tableName'='carbon_table')
+ """.stripMargin)
+
+ // val prop = s"$rootPath/conf/dataload.properties.template"
+ // val tableName = "carbon_table"
+ val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+ // TableLoader.main(Array[String](prop, tableName, path))
+
+ spark.sql(
+ s"""
+ | CREATE TABLE csv_table
+ | ( shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | timestampField string,
+ | decimalField decimal(18,2),
+ | dateField string,
+ | charField char(5))
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE csv_table
+ """.stripMargin)
+
+ spark.sql("""
+ SELECT *
+ FROM csv_table
+ """).show
+
+ spark.sql(
+ s"""
+ | INSERT INTO TABLE carbon_table
+ | SELECT shortField, intField, bigintField, doubleField, stringField,
+ | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
+ | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
+ | FROM csv_table
+ """.stripMargin)
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table
+ where stringfield = 'spark' and decimalField > 40
+ """).show
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table where length(stringField) = 5
+ """).show
+
+ spark.sql("""
+ SELECT *
+ FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+ """).show
+
+ spark.sql("""
+ select count(stringField) from carbon_table
+ """.stripMargin).show
+
+ spark.sql("""
+ SELECT sum(intField), stringField
+ FROM carbon_table
+ GROUP BY stringField
+ """).show
+
+ spark.sql(
+ """
+ |select t1.*, t2.*
+ |from carbon_table t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
+
+ spark.sql(
+ """
+ |with t1 as (
+ |select * from carbon_table
+ |union all
+ |select * from carbon_table
+ |)
+ |select t1.*, t2.*
+ |from t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
+
+ // Drop table
+ spark.sql("DROP TABLE IF EXISTS carbon_table")
+ spark.sql("DROP TABLE IF EXISTS csv_table")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
new file mode 100644
index 0000000..fba7976
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.lexical.StdLexical
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+
+import org.apache.spark.sql.catalyst.plans.logical._
+
+private[sql] abstract class AbstractCarbonSparkSQLParser
+ extends StandardTokenParsers with PackratParsers {
+
+ def parse(input: String): LogicalPlan = synchronized {
+ // Initialize the Keywords.
+ initLexical
+ phrase(start)(new lexical.Scanner(input)) match {
+ case Success(plan, _) => plan
+ case failureOrError => sys.error(failureOrError.toString)
+ }
+ }
+ /* One time initialization of lexical.This avoid reinitialization of lexical in parse method */
+ protected lazy val initLexical: Unit = lexical.initialize(reservedWords)
+
+ protected case class Keyword(str: String) {
+ def normalize: String = lexical.normalizeKeyword(str)
+ def parser: Parser[String] = normalize
+ }
+
+ protected implicit def asParser(k: Keyword): Parser[String] = k.parser
+
+ // By default, use Reflection to find the reserved words defined in the sub class.
+ // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this
+ // method during the parent class instantiation, because the sub class instance
+ // isn't created yet.
+ protected lazy val reservedWords: Seq[String] =
+ this
+ .getClass
+ .getMethods
+ .filter(_.getReturnType == classOf[Keyword])
+ .map(_.invoke(this).asInstanceOf[Keyword].normalize)
+
+ // Set the keywords as empty by default, will change that later.
+ override val lexical = new SqlLexical
+
+ protected def start: Parser[LogicalPlan]
+
+ // Returns the whole input string
+ protected lazy val wholeInput: Parser[String] = new Parser[String] {
+ def apply(in: Input): ParseResult[String] =
+ Success(in.source.toString, in.drop(in.source.length()))
+ }
+
+ // Returns the rest of the input string that are not parsed yet
+ protected lazy val restInput: Parser[String] = new Parser[String] {
+ def apply(in: Input): ParseResult[String] =
+ Success(
+ in.source.subSequence(in.offset, in.source.length()).toString,
+ in.drop(in.source.length()))
+ }
+}
+
+class SqlLexical extends StdLexical {
+ case class FloatLit(chars: String) extends Token {
+ override def toString: String = chars
+ }
+
+ /* This is a work around to support the lazy setting */
+ def initialize(keywords: Seq[String]): Unit = {
+ reserved.clear()
+ reserved ++= keywords
+ }
+
+ /* Normal the keyword string */
+ def normalizeKeyword(str: String): String = str.toLowerCase
+
+ delimiters += (
+ "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+ ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
+ )
+
+ protected override def processIdent(name: String) = {
+ val token = normalizeKeyword(name)
+ if (reserved contains token) Keyword(token) else Identifier(name)
+ }
+
+ override lazy val token: Parser[Token] =
+ ( identChar ~ (identChar | digit).* ^^
+ { case first ~ rest => processIdent((first :: rest).mkString) }
+ | digit.* ~ identChar ~ (identChar | digit).* ^^
+ { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) }
+ | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
+ case i ~ None => NumericLit(i.mkString)
+ case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+ }
+ | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
+ { case chars => StringLit(chars mkString "") }
+ | '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
+ { case chars => StringLit(chars mkString "") }
+ | '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
+ { case chars => Identifier(chars mkString "") }
+ | EofCh ^^^ EOF
+ | '\'' ~> failure("unclosed string literal")
+ | '"' ~> failure("unclosed string literal")
+ | delim
+ | failure("illegal character")
+ )
+
+ override def identChar: Parser[Elem] = letter | elem('_')
+
+ override def whitespace: Parser[Any] =
+ ( whitespaceChar
+ | '/' ~ '*' ~ comment
+ | '/' ~ '/' ~ chrExcept(EofCh, '\n').*
+ | '#' ~ chrExcept(EofCh, '\n').*
+ | '-' ~ '-' ~ chrExcept(EofCh, '\n').*
+ | '/' ~ '*' ~ failure("unclosed comment")
+ ).*
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
new file mode 100644
index 0000000..a5088df
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -0,0 +1,968 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.util.regex.{Matcher, Pattern}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.language.implicitConversions
+import scala.util.matching.Regex
+
+import org.apache.hadoop.hive.ql.lib.Node
+import org.apache.hadoop.hive.ql.parse._
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL cases
+ */
+abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ protected val AGGREGATE = carbonKeyWord("AGGREGATE")
+ protected val AS = carbonKeyWord("AS")
+ protected val AGGREGATION = carbonKeyWord("AGGREGATION")
+ protected val ALL = carbonKeyWord("ALL")
+ protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
+ protected val BEFORE = carbonKeyWord("BEFORE")
+ protected val BY = carbonKeyWord("BY")
+ protected val CARDINALITY = carbonKeyWord("CARDINALITY")
+ protected val CASCADE = carbonKeyWord("CASCADE")
+ protected val CLASS = carbonKeyWord("CLASS")
+ protected val CLEAN = carbonKeyWord("CLEAN")
+ protected val COLS = carbonKeyWord("COLS")
+ protected val COLUMNS = carbonKeyWord("COLUMNS")
+ protected val COMPACT = carbonKeyWord("COMPACT")
+ protected val CREATE = carbonKeyWord("CREATE")
+ protected val CUBE = carbonKeyWord("CUBE")
+ protected val CUBES = carbonKeyWord("CUBES")
+ protected val DATA = carbonKeyWord("DATA")
+ protected val DATABASE = carbonKeyWord("DATABASE")
+ protected val DATABASES = carbonKeyWord("DATABASES")
+ protected val DELETE = carbonKeyWord("DELETE")
+ protected val DELIMITER = carbonKeyWord("DELIMITER")
+ protected val DESCRIBE = carbonKeyWord("DESCRIBE")
+ protected val DESC = carbonKeyWord("DESC")
+ protected val DETAIL = carbonKeyWord("DETAIL")
+ protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
+ protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
+ protected val DROP = carbonKeyWord("DROP")
+ protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
+ protected val EXCLUDE = carbonKeyWord("EXCLUDE")
+ protected val EXPLAIN = carbonKeyWord("EXPLAIN")
+ protected val EXTENDED = carbonKeyWord("EXTENDED")
+ protected val FORMATTED = carbonKeyWord("FORMATTED")
+ protected val FACT = carbonKeyWord("FACT")
+ protected val FIELDS = carbonKeyWord("FIELDS")
+ protected val FILEHEADER = carbonKeyWord("FILEHEADER")
+ protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
+ protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
+ protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
+ protected val FILES = carbonKeyWord("FILES")
+ protected val FROM = carbonKeyWord("FROM")
+ protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
+ protected val IN = carbonKeyWord("IN")
+ protected val INCLUDE = carbonKeyWord("INCLUDE")
+ protected val INPATH = carbonKeyWord("INPATH")
+ protected val INTO = carbonKeyWord("INTO")
+ protected val LEVELS = carbonKeyWord("LEVELS")
+ protected val LIKE = carbonKeyWord("LIKE")
+ protected val LOAD = carbonKeyWord("LOAD")
+ protected val LOCAL = carbonKeyWord("LOCAL")
+ protected val MAPPED = carbonKeyWord("MAPPED")
+ protected val MEASURES = carbonKeyWord("MEASURES")
+ protected val MULTILINE = carbonKeyWord("MULTILINE")
+ protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
+ protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
+ protected val OPTIONS = carbonKeyWord("OPTIONS")
+ protected val OUTPATH = carbonKeyWord("OUTPATH")
+ protected val OVERWRITE = carbonKeyWord("OVERWRITE")
+ protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
+ protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
+ protected val PARTITIONER = carbonKeyWord("PARTITIONER")
+ protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
+ protected val RELATION = carbonKeyWord("RELATION")
+ protected val SCHEMA = carbonKeyWord("SCHEMA")
+ protected val SCHEMAS = carbonKeyWord("SCHEMAS")
+ protected val SHOW = carbonKeyWord("SHOW")
+ protected val TABLES = carbonKeyWord("TABLES")
+ protected val TABLE = carbonKeyWord("TABLE")
+ protected val TERMINATED = carbonKeyWord("TERMINATED")
+ protected val TYPE = carbonKeyWord("TYPE")
+ protected val USE = carbonKeyWord("USE")
+ protected val WHERE = carbonKeyWord("WHERE")
+ protected val WITH = carbonKeyWord("WITH")
+ protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
+ protected val ABS = carbonKeyWord("abs")
+
+ protected val FOR = carbonKeyWord("FOR")
+ protected val SCRIPTS = carbonKeyWord("SCRIPTS")
+ protected val USING = carbonKeyWord("USING")
+ protected val LIMIT = carbonKeyWord("LIMIT")
+ protected val DEFAULTS = carbonKeyWord("DEFAULTS")
+ protected val ALTER = carbonKeyWord("ALTER")
+ protected val ADD = carbonKeyWord("ADD")
+
+ protected val IF = carbonKeyWord("IF")
+ protected val NOT = carbonKeyWord("NOT")
+ protected val EXISTS = carbonKeyWord("EXISTS")
+ protected val DIMENSION = carbonKeyWord("DIMENSION")
+ protected val STARTTIME = carbonKeyWord("STARTTIME")
+ protected val SEGMENTS = carbonKeyWord("SEGMENTS")
+ protected val SEGMENT = carbonKeyWord("SEGMENT")
+
+ protected val STRING = carbonKeyWord("STRING")
+ protected val INTEGER = carbonKeyWord("INTEGER")
+ protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
+ protected val DATE = carbonKeyWord("DATE")
+ protected val CHAR = carbonKeyWord("CHAR")
+ protected val NUMERIC = carbonKeyWord("NUMERIC")
+ protected val DECIMAL = carbonKeyWord("DECIMAL")
+ protected val DOUBLE = carbonKeyWord("DOUBLE")
+ protected val SHORT = carbonKeyWord("SMALLINT")
+ protected val INT = carbonKeyWord("INT")
+ protected val BIGINT = carbonKeyWord("BIGINT")
+ protected val ARRAY = carbonKeyWord("ARRAY")
+ protected val STRUCT = carbonKeyWord("STRUCT")
+
+ protected val doubleQuotedString = "\"([^\"]+)\"".r
+ protected val singleQuotedString = "'([^']+)'".r
+
+ protected val newReservedWords =
+ this.getClass
+ .getMethods
+ .filter(_.getReturnType == classOf[Keyword])
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+ override val lexical = {
+ val sqllex = new SqlLexical()
+ sqllex.initialize(newReservedWords)
+ sqllex
+
+ }
+
+ import lexical.Identifier
+
+ implicit def regexToParser(regex: Regex): Parser[String] = {
+ acceptMatch(
+ s"identifier matching regex ${ regex }",
+ { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
+ )
+ }
+
+ /**
+ * This will convert key word to regular expression.
+ *
+ * @param keys
+ * @return
+ */
+ private def carbonKeyWord(keys: String) = {
+ ("(?i)" + keys).r
+ }
+
+ protected val escapedIdentifier = "`([^`]+)`".r
+
+ private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
+ var complexDimensions: Seq[Field] = Seq()
+ var dimensions: Seq[Field] = Seq()
+ dims.foreach { dimension =>
+ dimension.dataType.getOrElse("NIL") match {
+ case "Array" => complexDimensions = complexDimensions :+ dimension
+ case "Struct" => complexDimensions = complexDimensions :+ dimension
+ case _ => dimensions = dimensions :+ dimension
+ }
+ }
+ dimensions ++ complexDimensions
+ }
+
+
+
+ def getScaleAndPrecision(dataType: String): (Int, Int) = {
+ val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+ m.find()
+ val matchedString: String = m.group(1)
+ val scaleAndPrecision = matchedString.split(",")
+ (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+ }
+
+ /**
+ * This will prepate the Model from the Tree details.
+ *
+ * @param ifNotExistPresent
+ * @param dbName
+ * @param tableName
+ * @param fields
+ * @param partitionCols
+ * @param tableProperties
+ * @return
+ */
+ def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
+ , tableName: String, fields: Seq[Field],
+ partitionCols: Seq[PartitionerField],
+ tableProperties: Map[String, String]): TableModel
+ = {
+
+ fields.zipWithIndex.foreach { x =>
+ x._1.schemaOrdinal = x._2
+ }
+ val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+ fields, tableProperties)
+ if (dims.isEmpty) {
+ throw new MalformedCarbonCommandException(s"Table ${
+ dbName.getOrElse(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+ }.$tableName"
+ +
+ " can not be created without key columns. Please " +
+ "use DICTIONARY_INCLUDE or " +
+ "DICTIONARY_EXCLUDE to set at least one key " +
+ "column " +
+ "if all specified columns are numeric types")
+ }
+ val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+
+ // column properties
+ val colProps = extractColumnProperties(fields, tableProperties)
+ // get column groups configuration from table properties.
+ val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
+ noDictionaryDims, msrs, dims)
+
+ // get no inverted index columns from table properties.
+ val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+
+ // validate the tableBlockSize from table properties
+ CommonUtil.validateTableBlockSize(tableProperties)
+
+ TableModel(
+ ifNotExistPresent,
+ dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+ dbName,
+ tableName,
+ tableProperties,
+ reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+ msrs.map(f => normalizeType(f)),
+ Option(noDictionaryDims),
+ Option(noInvertedIdxCols),
+ groupCols,
+ Some(colProps))
+ }
+
+ /**
+ * Extract the column groups configuration from table properties.
+ * Based on this Row groups of fields will be determined.
+ *
+ * @param tableProperties
+ * @return
+ */
+ protected def updateColumnGroupsInField(tableProperties: Map[String, String],
+ noDictionaryDims: Seq[String],
+ msrs: Seq[Field],
+ dims: Seq[Field]): Seq[String] = {
+ if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
+
+ var splittedColGrps: Seq[String] = Seq[String]()
+ val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+
+ // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
+ // here first splitting the value by () . so that the above will be splitted into 2 strings.
+ // [col1,col2] [col3,col4]
+ val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
+ while (m.find()) {
+ val oneGroup: String = m.group(1)
+ CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
+ val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
+ splittedColGrps :+= arrangedColGrp
+ }
+ // This will be furthur handled.
+ CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
+ } else {
+ null
+ }
+ }
+
+ def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
+ // if columns in column group is not in schema order than arrange it in schema order
+ var colGrpFieldIndx: Seq[Int] = Seq[Int]()
+ colGroup.split(',').map(_.trim).foreach { x =>
+ dims.zipWithIndex.foreach { dim =>
+ if (dim._1.column.equalsIgnoreCase(x)) {
+ colGrpFieldIndx :+= dim._2
+ }
+ }
+ }
+ // sort it
+ colGrpFieldIndx = colGrpFieldIndx.sorted
+ // check if columns in column group is in schema order
+ if (!checkIfInSequence(colGrpFieldIndx)) {
+ throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
+ }
+ def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
+ for (i <- 0 until (colGrpFieldIndx.length - 1)) {
+ if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
+ throw new MalformedCarbonCommandException(
+ "Invalid column group,column in group should be contiguous as per schema.")
+ }
+ }
+ true
+ }
+ val colGrpNames: StringBuilder = StringBuilder.newBuilder
+ for (i <- colGrpFieldIndx.indices) {
+ colGrpNames.append(dims(colGrpFieldIndx(i)).column)
+ if (i < (colGrpFieldIndx.length - 1)) {
+ colGrpNames.append(",")
+ }
+ }
+ colGrpNames.toString()
+ }
+
+ /**
+ * For getting the partitioner Object
+ *
+ * @param partitionCols
+ * @param tableProperties
+ * @return
+ */
+ protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
+ tableProperties: Map[String, String]):
+ Option[Partitioner] = {
+
+ // by default setting partition class empty.
+ // later in table schema it is setting to default value.
+ var partitionClass: String = ""
+ var partitionCount: Int = 1
+ var partitionColNames: Array[String] = Array[String]()
+ if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
+ partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
+ }
+
+ if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
+ try {
+ partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
+ } catch {
+ case e: Exception => // no need to do anything.
+ }
+ }
+
+ partitionCols.foreach(col =>
+ partitionColNames :+= col.partitionColumn
+ )
+
+ // this means user has given partition cols list
+ if (!partitionColNames.isEmpty) {
+ return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+ }
+ // if partition cols are not given then no need to do partition.
+ None
+ }
+
+ protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+ java.util.Map[String, java.util.List[ColumnProperty]] = {
+ val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
+ fields.foreach { field =>
+ if (field.children.isDefined && field.children.get != null) {
+ fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+ } else {
+ fillColumnProperty(None, field.column, tableProperties, colPropMap)
+ }
+ }
+ colPropMap
+ }
+
+ protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ fieldChildren.foreach(fields => {
+ fields.foreach(field => {
+ fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+ }
+ )
+ }
+ )
+ }
+
+ protected def fillColumnProperty(parentColumnName: Option[String],
+ columnName: String,
+ tableProperties: Map[String, String],
+ colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+ val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+ val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+ if (colProps.isDefined) {
+ colPropMap.put(colProKey, colProps.get)
+ }
+ }
+
+ def getKey(parentColumnName: Option[String],
+ columnName: String): (String, String) = {
+ if (parentColumnName.isDefined) {
+ if (columnName == "val") {
+ (parentColumnName.get, parentColumnName.get + "." + columnName)
+ } else {
+ (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+ }
+ } else {
+ (columnName, columnName)
+ }
+ }
+
+ /**
+ * This will extract the no inverted columns fields.
+ * By default all dimensions use inverted index.
+ *
+ * @param fields
+ * @param tableProperties
+ * @return
+ */
+ protected def extractNoInvertedIndexColumns(fields: Seq[Field],
+ tableProperties: Map[String, String]): Seq[String] = {
+ // check whether the column name is in fields
+ var noInvertedIdxColsProps: Array[String] = Array[String]()
+ var noInvertedIdxCols: Seq[String] = Seq[String]()
+
+ if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
+ noInvertedIdxColsProps =
+ tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
+ noInvertedIdxColsProps.map { noInvertedIdxColProp =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+ val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+ }
+ // check duplicate columns and only 1 col left
+ val distinctCols = noInvertedIdxColsProps.toSet
+ // extract the no inverted index columns
+ fields.foreach(field => {
+ if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ noInvertedIdxCols :+= field.column
+ }
+ }
+ )
+ noInvertedIdxCols
+ }
+
+ /**
+ * This will extract the Dimensions and NoDictionary Dimensions fields.
+ * By default all string cols are dimensions.
+ *
+ * @param fields
+ * @param tableProperties
+ * @return
+ */
+ protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
+ tableProperties: Map[String, String]):
+ (Seq[Field], Seq[String]) = {
+ var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+ var dictExcludeCols: Array[String] = Array[String]()
+ var noDictionaryDims: Seq[String] = Seq[String]()
+ var dictIncludeCols: Seq[String] = Seq[String]()
+
+ // All excluded cols should be there in create table cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+ dictExcludeCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ dictExcludeCols
+ .map { dictExcludeCol =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
+ val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ } else {
+ val dataType = fields.find(x =>
+ x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
+ if (isComplexDimDictionaryExclude(dataType)) {
+ val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
+ dictExcludeCol
+ throw new MalformedCarbonCommandException(errormsg)
+ } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
+ val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
+ " data type column: " + dictExcludeCol
+ throw new MalformedCarbonCommandException(errorMsg)
+ }
+ }
+ }
+ }
+ // All included cols should be there in create table cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+ dictIncludeCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
+ dictIncludeCols.map { distIncludeCol =>
+ if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+ val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+ " does not exist in table. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+ }
+
+ // include cols should contain exclude cols
+ dictExcludeCols.foreach { dicExcludeCol =>
+ if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
+ val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
+ " with DICTIONARY_INCLUDE. Please check create table statement."
+ throw new MalformedCarbonCommandException(errormsg)
+ }
+ }
+
+ // by default consider all String cols as dims and if any dictionary exclude is present then
+ // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
+ fields.foreach(field => {
+
+ if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
+ val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+ if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
+ noDictionaryDims :+= field.column
+ }
+ dimFields += field
+ } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ dimFields += (field)
+ } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+ dimFields += (field)
+ }
+ }
+ )
+
+ (dimFields.toSeq, noDictionaryDims)
+ }
+
+ /**
+ * It fills non string dimensions in dimFields
+ */
+ def fillNonStringDimension(dictIncludeCols: Seq[String],
+ field: Field, dimFields: LinkedHashSet[Field]) {
+ var dictInclude = false
+ if (dictIncludeCols.nonEmpty) {
+ dictIncludeCols.foreach(dictIncludeCol =>
+ if (field.column.equalsIgnoreCase(dictIncludeCol)) {
+ dictInclude = true
+ })
+ }
+ if (dictInclude) {
+ dimFields += field
+ }
+ }
+
+ /**
+ * detect dimention data type
+ *
+ * @param dimensionDatatype
+ */
+ def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
+ val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
+ dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
+ }
+
+ /**
+ * detects whether complex dimension is part of dictionary_exclude
+ */
+ def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
+ val dimensionType = Array("array", "struct")
+ dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
+ }
+
+ /**
+ * detects whether double or decimal column is part of dictionary_exclude
+ */
+ def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
+ val dataTypes = Array("string", "timestamp")
+ dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+ }
+
+ /**
+ * Extract the Measure Cols fields. By default all non string cols will be measures.
+ *
+ * @param fields
+ * @param tableProperties
+ * @return
+ */
+ protected def extractMsrColsFromFields(fields: Seq[Field],
+ tableProperties: Map[String, String]): Seq[Field] = {
+ var msrFields: Seq[Field] = Seq[Field]()
+ var dictIncludedCols: Array[String] = Array[String]()
+ var dictExcludedCols: Array[String] = Array[String]()
+
+ // get all included cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+ dictIncludedCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+ }
+
+ // get all excluded cols
+ if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+ dictExcludedCols =
+ tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+ }
+
+ // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
+ fields.foreach(field => {
+ if (!isDetectAsDimentionDatatype(field.dataType.get)) {
+ if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+ !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
+ msrFields :+= field
+ }
+ }
+ })
+
+ msrFields
+ }
+
+ /**
+ * Extract the DbName and table name.
+ *
+ * @param tableNameParts
+ * @return
+ */
+ protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
+ val (db, tableName) =
+ tableNameParts.getChildren.asScala.map {
+ case Token(part, Nil) => cleanIdentifier(part)
+ } match {
+ case Seq(tableOnly) => (None, tableOnly)
+ case Seq(databaseName, table) => (Some(databaseName), table)
+ }
+
+ (db, tableName)
+ }
+
+ protected def cleanIdentifier(ident: String): String = {
+ ident match {
+ case escapedIdentifier(i) => i
+ case plainIdent => plainIdent
+ }
+ }
+
+ protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
+ var remainingNodes = nodeList
+ val clauses = clauseNames.map { clauseName =>
+ val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
+ remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
+ matches.tail
+ } else {
+ Nil
+ })
+ matches.headOption
+ }
+
+ if (remainingNodes.nonEmpty) {
+ sys.error(
+ s"""Unhandled clauses:
+ |You are likely trying to use an unsupported carbon feature."""".stripMargin)
+ }
+ clauses
+ }
+
+ object Token {
+ /** @return matches of the form (tokenName, children). */
+ def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
+ t match {
+ case t: ASTNode =>
+ CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
+ Some((t.getText,
+ Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+ case _ => None
+ }
+ }
+ }
+
+ /**
+ * Extract the table properties token
+ *
+ * @param node
+ * @return
+ */
+ protected def getProperties(node: Node): Seq[(String, String)] = {
+ node match {
+ case Token("TOK_TABLEPROPLIST", list) =>
+ list.map {
+ case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+ (unquoteString(key) -> unquoteString(value))
+ }
+ }
+ }
+
+ protected def unquoteString(str: String) = {
+ str match {
+ case singleQuotedString(s) => s.toLowerCase()
+ case doubleQuotedString(s) => s.toLowerCase()
+ case other => other
+ }
+ }
+
+ protected def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
+
+ // validate with all supported options
+ val options = optionList.get.groupBy(x => x._1)
+ val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
+ "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
+ "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
+ "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
+ )
+ var isSupported = true
+ val invalidOptions = StringBuilder.newBuilder
+ options.foreach(value => {
+ if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
+ isSupported = false
+ invalidOptions.append(value._1)
+ }
+
+ }
+ )
+ if (!isSupported) {
+ val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+
+ // COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
+ if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
+ options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
+ val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
+ " in options"
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+
+ if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
+ val maxColumns: String = options.get("maxcolumns").get(0)._2
+ try {
+ maxColumns.toInt
+ } catch {
+ case ex: NumberFormatException =>
+ throw new MalformedCarbonCommandException(
+ "option MAXCOLUMNS can only contain integer values")
+ }
+ }
+
+ // check for duplicate options
+ val duplicateOptions = options filter {
+ case (_, optionlist) => optionlist.size > 1
+ }
+ val duplicates = StringBuilder.newBuilder
+ if (duplicateOptions.nonEmpty) {
+ duplicateOptions.foreach(x => {
+ duplicates.append(x._1)
+ }
+ )
+ val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ }
+
+ protected lazy val dbTableIdentifier: Parser[Seq[String]] =
+ (ident <~ ".").? ~ (ident) ^^ {
+ case databaseName ~ tableName =>
+ if (databaseName.isDefined) {
+ Seq(databaseName.get, tableName)
+ } else {
+ Seq(tableName)
+ }
+ }
+
+ protected lazy val loadOptions: Parser[(String, String)] =
+ (stringLit <~ "=") ~ stringLit ^^ {
+ case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
+ case _ => ("", "")
+ }
+
+
+ protected lazy val dimCol: Parser[Field] = anyFieldDef
+
+ protected lazy val primitiveTypes =
+ STRING ^^^ "string" | INTEGER ^^^ "integer" |
+ TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
+ BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
+ INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
+
+ /**
+ * Matching the decimal(10,0) data type and returning the same.
+ */
+ private lazy val charType =
+ CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
+ case char ~ digit =>
+ s"$char($digit)"
+ }
+
+ /**
+ * Matching the decimal(10,0) data type and returning the same.
+ */
+ private lazy val decimalType =
+ DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
+ case decimal ~ precision ~ scale =>
+ s"$decimal($precision, $scale)"
+ }
+
+ protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
+ primitiveFieldType
+
+ lazy val anyFieldDef: Parser[Field] =
+ (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
+ case e1 ~ e2 ~ e3 =>
+ Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
+ }
+
+ protected lazy val primitiveFieldType: Parser[Field] =
+ (primitiveTypes) ^^ {
+ case e1 =>
+ Field("unknown", Some(e1), Some("unknown"), Some(null))
+ }
+
+ protected lazy val arrayFieldType: Parser[Field] =
+ ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
+ case e1 =>
+ Field("unknown", Some("array"), Some("unknown"),
+ Some(List(Field("val", e1.dataType, Some("val"),
+ e1.children))))
+ }
+
+ protected lazy val structFieldType: Parser[Field] =
+ ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
+ case e1 =>
+ Field("unknown", Some("struct"), Some("unknown"), Some(e1))
+ }
+
+ protected lazy val measureCol: Parser[Field] =
+ (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
+ BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
+ (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
+ case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
+ }
+
+ private def normalizeType(field: Field): Field = {
+ val dataType = field.dataType.getOrElse("NIL")
+ dataType match {
+ case "string" =>
+ Field(field.column, Some("String"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ case "smallint" =>
+ Field(field.column, Some("SmallInt"), field.name, Some(null),
+ field.parent, field.storeType, field.schemaOrdinal,
+ field.precision, field.scale, field.rawSchema)
+ case "integer" | "int" =>
+ Field(field.column, Some("Integer"), field.name, Some(null),
+ field.parent, field.storeType, field.schemaOrdinal,
+ field.precision, field.scale, field.rawSchema)
+ case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ case "timestamp" =>
+ Field(field.column, Some("Timestamp"), field.name, Some(null),
+ field.parent, field.storeType, field.schemaOrdinal,
+ field.precision, field.scale, field.rawSchema)
+ case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ case "array" =>
+ Field(field.column, Some("Array"), field.name,
+ field.children.map(f => f.map(normalizeType(_))),
+ field.parent, field.storeType, field.schemaOrdinal,
+ field.precision, field.scale, field.rawSchema)
+ case "struct" =>
+ Field(field.column, Some("Struct"), field.name,
+ field.children.map(f => f.map(normalizeType(_))),
+ field.parent, field.storeType, field.schemaOrdinal,
+ field.precision, field.scale, field.rawSchema)
+ case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+ )
+ // checking if the nested data type contains the child type as decimal(10,0),
+ // if it is present then extracting the precision and scale. resetting the data type
+ // with Decimal.
+ case _ if (dataType.startsWith("decimal")) =>
+ val (precision, scale) = getScaleAndPrecision(dataType)
+ Field(field.column,
+ Some("Decimal"),
+ field.name,
+ Some(null),
+ field.parent,
+ field.storeType, field.schemaOrdinal, precision,
+ scale,
+ field.rawSchema
+ )
+ case _ =>
+ field
+ }
+ }
+
+ private def addParent(field: Field): Field = {
+ field.dataType.getOrElse("NIL") match {
+ case "Array" => Field(field.column, Some("Array"), field.name,
+ field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+ field.storeType, field.schemaOrdinal)
+ case "Struct" => Field(field.column, Some("Struct"), field.name,
+ field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+ field.storeType, field.schemaOrdinal)
+ case _ => field
+ }
+ }
+
+ private def appendParentForEachChild(field: Field, parentName: String): Field = {
+ field.dataType.getOrElse("NIL") match {
+ case "String" => Field(parentName + "." + field.column, Some("String"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Long" => Field(parentName + "." + field.column, Some("Long"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Double" => Field(parentName + "." + field.column, Some("Double"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Array" => Field(parentName + "." + field.column, Some("Array"),
+ Some(parentName + "." + field.name.getOrElse(None)),
+ field.children
+ .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+ parentName)
+ case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
+ Some(parentName + "." + field.name.getOrElse(None)),
+ field.children
+ .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+ parentName)
+ case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+ case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
+ field.storeType, field.schemaOrdinal, field.precision, field.scale)
+ case _ => field
+ }
+ }
+
+ protected lazy val segmentId: Parser[String] =
+ numericLit ^^ { u => u } |
+ elem("decimal", p => {
+ p.getClass.getSimpleName.equals("FloatLit") ||
+ p.getClass.getSimpleName.equals("DecimalLit")
+ }) ^^ (_.chars)
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 531b691..f646f1d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -56,7 +56,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
children: Option[List[Field]], parent: String = null,
storeType: Option[String] = Some("columnar"),
var schemaOrdinal: Int = -1,
- var precision: Int = 0, var scale: Int = 0)
+ var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
case class ColumnProperty(key: String, value: String)
@@ -108,12 +108,12 @@ case class CompactionCallableModel(storePath: String,
compactionType: CompactionType)
object TableNewProcessor {
- def apply(cm: TableModel, sqlContext: SQLContext): TableInfo = {
- new TableNewProcessor(cm, sqlContext).process
+ def apply(cm: TableModel): TableInfo = {
+ new TableNewProcessor(cm).process
}
}
-class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
+class TableNewProcessor(cm: TableModel) {
var index = 0
var rowGroup = 0
[2/4] incubator-carbondata git commit: Initial commit
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 21864d1..16e35f4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -17,13 +17,9 @@
package org.apache.spark.sql
-import java.util.regex.{Matcher, Pattern}
-
import scala.collection.JavaConverters._
-import scala.collection.mutable.LinkedHashSet
import scala.collection.mutable.Map
import scala.language.implicitConversions
-import scala.util.matching.Regex
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.parse._
@@ -31,154 +27,18 @@ import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveQlWrapper
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
* Parser for All Carbon DDL, DML cases in Unified context
*/
-class CarbonSqlParser() extends AbstractSparkSQLParser {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- protected val AGGREGATE = carbonKeyWord("AGGREGATE")
- protected val AS = carbonKeyWord("AS")
- protected val AGGREGATION = carbonKeyWord("AGGREGATION")
- protected val ALL = carbonKeyWord("ALL")
- protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
- protected val BEFORE = carbonKeyWord("BEFORE")
- protected val BY = carbonKeyWord("BY")
- protected val CARDINALITY = carbonKeyWord("CARDINALITY")
- protected val CASCADE = carbonKeyWord("CASCADE")
- protected val CLASS = carbonKeyWord("CLASS")
- protected val CLEAN = carbonKeyWord("CLEAN")
- protected val COLS = carbonKeyWord("COLS")
- protected val COLUMNS = carbonKeyWord("COLUMNS")
- protected val CREATE = carbonKeyWord("CREATE")
- protected val CUBE = carbonKeyWord("CUBE")
- protected val CUBES = carbonKeyWord("CUBES")
- protected val DATA = carbonKeyWord("DATA")
- protected val DATABASE = carbonKeyWord("DATABASE")
- protected val DATABASES = carbonKeyWord("DATABASES")
- protected val DELETE = carbonKeyWord("DELETE")
- protected val DELIMITER = carbonKeyWord("DELIMITER")
- protected val DESCRIBE = carbonKeyWord("DESCRIBE")
- protected val DESC = carbonKeyWord("DESC")
- protected val DETAIL = carbonKeyWord("DETAIL")
- protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
- protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
- protected val DROP = carbonKeyWord("DROP")
- protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
- protected val EXCLUDE = carbonKeyWord("EXCLUDE")
- protected val EXPLAIN = carbonKeyWord("EXPLAIN")
- protected val EXTENDED = carbonKeyWord("EXTENDED")
- protected val FORMATTED = carbonKeyWord("FORMATTED")
- protected val FACT = carbonKeyWord("FACT")
- protected val FIELDS = carbonKeyWord("FIELDS")
- protected val FILEHEADER = carbonKeyWord("FILEHEADER")
- protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
- protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
- protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
- protected val FILES = carbonKeyWord("FILES")
- protected val FROM = carbonKeyWord("FROM")
- protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
- protected val IN = carbonKeyWord("IN")
- protected val INCLUDE = carbonKeyWord("INCLUDE")
- protected val INPATH = carbonKeyWord("INPATH")
- protected val INTO = carbonKeyWord("INTO")
- protected val LEVELS = carbonKeyWord("LEVELS")
- protected val LIKE = carbonKeyWord("LIKE")
- protected val LOAD = carbonKeyWord("LOAD")
- protected val LOCAL = carbonKeyWord("LOCAL")
- protected val MAPPED = carbonKeyWord("MAPPED")
- protected val MEASURES = carbonKeyWord("MEASURES")
- protected val MULTILINE = carbonKeyWord("MULTILINE")
- protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
- protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
- protected val OPTIONS = carbonKeyWord("OPTIONS")
- protected val OUTPATH = carbonKeyWord("OUTPATH")
- protected val OVERWRITE = carbonKeyWord("OVERWRITE")
- protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
- protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
- protected val PARTITIONER = carbonKeyWord("PARTITIONER")
- protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
- protected val RELATION = carbonKeyWord("RELATION")
- protected val SCHEMA = carbonKeyWord("SCHEMA")
- protected val SCHEMAS = carbonKeyWord("SCHEMAS")
- protected val SHOW = carbonKeyWord("SHOW")
- protected val TABLES = carbonKeyWord("TABLES")
- protected val TABLE = carbonKeyWord("TABLE")
- protected val TERMINATED = carbonKeyWord("TERMINATED")
- protected val TYPE = carbonKeyWord("TYPE")
- protected val USE = carbonKeyWord("USE")
- protected val WHERE = carbonKeyWord("WHERE")
- protected val WITH = carbonKeyWord("WITH")
- protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
- protected val ABS = carbonKeyWord("abs")
-
- protected val FOR = carbonKeyWord("FOR")
- protected val SCRIPTS = carbonKeyWord("SCRIPTS")
- protected val USING = carbonKeyWord("USING")
- protected val LIMIT = carbonKeyWord("LIMIT")
- protected val DEFAULTS = carbonKeyWord("DEFAULTS")
- protected val ALTER = carbonKeyWord("ALTER")
- protected val ADD = carbonKeyWord("ADD")
-
- protected val IF = carbonKeyWord("IF")
- protected val NOT = carbonKeyWord("NOT")
- protected val EXISTS = carbonKeyWord("EXISTS")
- protected val DIMENSION = carbonKeyWord("DIMENSION")
- protected val STARTTIME = carbonKeyWord("STARTTIME")
- protected val SEGMENTS = carbonKeyWord("SEGMENTS")
- protected val SEGMENT = carbonKeyWord("SEGMENT")
-
- protected val STRING = carbonKeyWord("STRING")
- protected val INTEGER = carbonKeyWord("INTEGER")
- protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
- protected val DATE = carbonKeyWord("DATE")
- protected val CHAR = carbonKeyWord("CHAR")
- protected val NUMERIC = carbonKeyWord("NUMERIC")
- protected val DECIMAL = carbonKeyWord("DECIMAL")
- protected val DOUBLE = carbonKeyWord("DOUBLE")
- protected val SHORT = carbonKeyWord("SMALLINT")
- protected val INT = carbonKeyWord("INT")
- protected val BIGINT = carbonKeyWord("BIGINT")
- protected val ARRAY = carbonKeyWord("ARRAY")
- protected val STRUCT = carbonKeyWord("STRUCT")
-
- protected val doubleQuotedString = "\"([^\"]+)\"".r
- protected val singleQuotedString = "'([^']+)'".r
-
- protected val newReservedWords =
- this.getClass
- .getMethods
- .filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword].str)
-
- override val lexical = {
- val sqllex = new SqlLexical()
- sqllex.initialize(newReservedWords)
- sqllex
-
- }
-
- import lexical.Identifier
-
- implicit def regexToParser(regex: Regex): Parser[String] = {
- acceptMatch(
- s"identifier matching regex ${ regex }",
- { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
- )
- }
+class CarbonSqlParser() extends CarbonDDLSqlParser {
override def parse(input: String): LogicalPlan = {
synchronized {
@@ -196,26 +56,14 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- /**
- * This will convert key word to regular expression.
- *
- * @param keys
- * @return
- */
- private def carbonKeyWord(keys: String) = {
- ("(?i)" + keys).r
- }
-
override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
- protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
- loadManagement | describeTable |
- showLoads | alterTable | createTable
-
- protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
- cleanFiles | loadDataNew
+ protected lazy val startCommand: Parser[LogicalPlan] =
+ createDatabase | dropDatabase | loadManagement | describeTable |
+ showLoads | alterTable | createTable
- protected val escapedIdentifier = "`([^`]+)`".r
+ protected lazy val loadManagement: Parser[LogicalPlan] =
+ deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
protected lazy val createDatabase: Parser[LogicalPlan] =
CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
@@ -253,19 +101,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
DropDatabase(dbName, isCascade, dropDbSql)
}
- private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
- var complexDimensions: Seq[Field] = Seq()
- var dimensions: Seq[Field] = Seq()
- dims.foreach { dimension =>
- dimension.dataType.getOrElse("NIL") match {
- case "Array" => complexDimensions = complexDimensions :+ dimension
- case "Struct" => complexDimensions = complexDimensions :+ dimension
- case _ => dimensions = dimensions :+ dimension
- }
- }
- dimensions ++ complexDimensions
- }
-
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> restInput ^^ {
case statement =>
@@ -303,14 +138,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- private def getScaleAndPrecision(dataType: String): (Int, Int) = {
- val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
- m.find()
- val matchedString: String = m.group(1)
- val scaleAndPrecision = matchedString.split(",")
- (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
- }
-
/**
* This function will traverse the tree and logical plan will be formed using that.
*
@@ -386,6 +213,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
if(f.dataType.getOrElse("").startsWith("char")) {
f.dataType = Some("char")
}
+ f.rawSchema = x
fields ++= Seq(f)
}
}
@@ -494,511 +322,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- /**
- * This will prepate the Model from the Tree details.
- *
- * @param ifNotExistPresent
- * @param dbName
- * @param tableName
- * @param fields
- * @param partitionCols
- * @param tableProperties
- * @return
- */
- protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
- , tableName: String, fields: Seq[Field],
- partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]): TableModel
- = {
-
- fields.zipWithIndex.foreach { x =>
- x._1.schemaOrdinal = x._2
- }
- val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
- fields, tableProperties)
- if (dims.isEmpty) {
- throw new MalformedCarbonCommandException(s"Table ${
- dbName.getOrElse(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME)
- }.$tableName"
- +
- " can not be created without key columns. Please " +
- "use DICTIONARY_INCLUDE or " +
- "DICTIONARY_EXCLUDE to set at least one key " +
- "column " +
- "if all specified columns are numeric types")
- }
- val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
-
- // column properties
- val colProps = extractColumnProperties(fields, tableProperties)
- // get column groups configuration from table properties.
- val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
- noDictionaryDims, msrs, dims)
-
- // get no inverted index columns from table properties.
- val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
-
- // validate the tableBlockSize from table properties
- CommonUtil.validateTableBlockSize(tableProperties)
-
- TableModel(
- ifNotExistPresent,
- dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
- dbName,
- tableName,
- tableProperties,
- reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
- msrs.map(f => normalizeType(f)),
- Option(noDictionaryDims),
- Option(noInvertedIdxCols),
- groupCols,
- Some(colProps))
- }
-
- /**
- * Extract the column groups configuration from table properties.
- * Based on this Row groups of fields will be determined.
- *
- * @param tableProperties
- * @return
- */
- protected def updateColumnGroupsInField(tableProperties: Map[String, String],
- noDictionaryDims: Seq[String],
- msrs: Seq[Field],
- dims: Seq[Field]): Seq[String] = {
- if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
-
- var splittedColGrps: Seq[String] = Seq[String]()
- val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
-
- // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
- // here first splitting the value by () . so that the above will be splitted into 2 strings.
- // [col1,col2] [col3,col4]
- val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
- while (m.find()) {
- val oneGroup: String = m.group(1)
- CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
- val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
- splittedColGrps :+= arrangedColGrp
- }
- // This will be furthur handled.
- CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
- } else {
- null
- }
- }
-
- def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
- // if columns in column group is not in schema order than arrange it in schema order
- var colGrpFieldIndx: Seq[Int] = Seq[Int]()
- colGroup.split(',').map(_.trim).foreach { x =>
- dims.zipWithIndex.foreach { dim =>
- if (dim._1.column.equalsIgnoreCase(x)) {
- colGrpFieldIndx :+= dim._2
- }
- }
- }
- // sort it
- colGrpFieldIndx = colGrpFieldIndx.sorted
- // check if columns in column group is in schema order
- if (!checkIfInSequence(colGrpFieldIndx)) {
- throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
- }
- def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
- for (i <- 0 until (colGrpFieldIndx.length - 1)) {
- if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
- throw new MalformedCarbonCommandException(
- "Invalid column group,column in group should be contiguous as per schema.")
- }
- }
- true
- }
- val colGrpNames: StringBuilder = StringBuilder.newBuilder
- for (i <- colGrpFieldIndx.indices) {
- colGrpNames.append(dims(colGrpFieldIndx(i)).column)
- if (i < (colGrpFieldIndx.length - 1)) {
- colGrpNames.append(",")
- }
- }
- colGrpNames.toString()
- }
-
- /**
- * For getting the partitioner Object
- *
- * @param partitionCols
- * @param tableProperties
- * @return
- */
- protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]):
- Option[Partitioner] = {
-
- // by default setting partition class empty.
- // later in table schema it is setting to default value.
- var partitionClass: String = ""
- var partitionCount: Int = 1
- var partitionColNames: Array[String] = Array[String]()
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
- partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
- }
-
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
- try {
- partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
- } catch {
- case e: Exception => // no need to do anything.
- }
- }
-
- partitionCols.foreach(col =>
- partitionColNames :+= col.partitionColumn
- )
-
- // this means user has given partition cols list
- if (!partitionColNames.isEmpty) {
- return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
- }
- // if partition cols are not given then no need to do partition.
- None
- }
-
- protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
- java.util.Map[String, java.util.List[ColumnProperty]] = {
- val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
- fields.foreach { field =>
- if (field.children.isDefined && field.children.get != null) {
- fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
- } else {
- fillColumnProperty(None, field.column, tableProperties, colPropMap)
- }
- }
- colPropMap
- }
-
- protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
- fieldChildren.foreach(fields => {
- fields.foreach(field => {
- fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
- }
- )
- }
- )
- }
-
- protected def fillColumnProperty(parentColumnName: Option[String],
- columnName: String,
- tableProperties: Map[String, String],
- colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
- val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
- val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
- if (colProps.isDefined) {
- colPropMap.put(colProKey, colProps.get)
- }
- }
-
- def getKey(parentColumnName: Option[String],
- columnName: String): (String, String) = {
- if (parentColumnName.isDefined) {
- if (columnName == "val") {
- (parentColumnName.get, parentColumnName.get + "." + columnName)
- } else {
- (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
- }
- } else {
- (columnName, columnName)
- }
- }
-
- /**
- * This will extract the no inverted columns fields.
- * By default all dimensions use inverted index.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractNoInvertedIndexColumns(fields: Seq[Field],
- tableProperties: Map[String, String]):
- Seq[String] = {
- // check whether the column name is in fields
- var noInvertedIdxColsProps: Array[String] = Array[String]()
- var noInvertedIdxCols: Seq[String] = Seq[String]()
-
- if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
- noInvertedIdxColsProps =
- tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
- noInvertedIdxColsProps
- .map { noInvertedIdxColProp =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
- val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
- }
- // check duplicate columns and only 1 col left
- val distinctCols = noInvertedIdxColsProps.toSet
- // extract the no inverted index columns
- fields.foreach(field => {
- if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
- noInvertedIdxCols :+= field.column
- }
- }
- )
- noInvertedIdxCols
- }
-
- /**
- * This will extract the Dimensions and NoDictionary Dimensions fields.
- * By default all string cols are dimensions.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
- tableProperties: Map[String, String]):
- (Seq[Field], Seq[String]) = {
- var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
- var dictExcludeCols: Array[String] = Array[String]()
- var noDictionaryDims: Seq[String] = Seq[String]()
- var dictIncludeCols: Seq[String] = Seq[String]()
-
- // All excluded cols should be there in create table cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
- dictExcludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
- dictExcludeCols
- .map { dictExcludeCol =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
- val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- } else {
- val dataType = fields.find(x =>
- x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
- if (isComplexDimDictionaryExclude(dataType)) {
- val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
- dictExcludeCol
- throw new MalformedCarbonCommandException(errormsg)
- } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
- val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
- " data type column: " + dictExcludeCol
- throw new MalformedCarbonCommandException(errorMsg)
- }
- }
- }
- }
- // All included cols should be there in create table cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
- dictIncludeCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
- dictIncludeCols.map { distIncludeCol =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
- val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
- " does not exist in table. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
- }
-
- // include cols should contain exclude cols
- dictExcludeCols.foreach { dicExcludeCol =>
- if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
- val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
- " with DICTIONARY_INCLUDE. Please check create table statement."
- throw new MalformedCarbonCommandException(errormsg)
- }
- }
-
- // by default consider all String cols as dims and if any dictionary exclude is present then
- // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
- fields.foreach(field => {
-
- if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
- val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
- if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
- noDictionaryDims :+= field.column
- }
- dimFields += field
- } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
- dimFields += (field)
- } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
- dimFields += (field)
- }
- }
- )
-
- (dimFields.toSeq, noDictionaryDims)
- }
-
- /**
- * It fills non string dimensions in dimFields
- */
- def fillNonStringDimension(dictIncludeCols: Seq[String],
- field: Field, dimFields: LinkedHashSet[Field]) {
- var dictInclude = false
- if (dictIncludeCols.nonEmpty) {
- dictIncludeCols.foreach(dictIncludeCol =>
- if (field.column.equalsIgnoreCase(dictIncludeCol)) {
- dictInclude = true
- })
- }
- if (dictInclude) {
- dimFields += field
- }
- }
-
- /**
- * detect dimention data type
- *
- * @param dimensionDatatype
- */
- def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
- val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
- dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
- }
-
- /**
- * detects whether complex dimension is part of dictionary_exclude
- */
- def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
- val dimensionType = Array("array", "struct")
- dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
- }
-
- /**
- * detects whether double or decimal column is part of dictionary_exclude
- */
- def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
- val dataTypes = Array("string", "timestamp")
- dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
- }
-
- /**
- * Extract the Measure Cols fields. By default all non string cols will be measures.
- *
- * @param fields
- * @param tableProperties
- * @return
- */
- protected def extractMsrColsFromFields(fields: Seq[Field],
- tableProperties: Map[String, String]): Seq[Field] = {
- var msrFields: Seq[Field] = Seq[Field]()
- var dictIncludedCols: Array[String] = Array[String]()
- var dictExcludedCols: Array[String] = Array[String]()
-
- // get all included cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
- dictIncludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
- }
-
- // get all excluded cols
- if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
- dictExcludedCols =
- tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
- }
-
- // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
- fields.foreach(field => {
- if (!isDetectAsDimentionDatatype(field.dataType.get)) {
- if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
- !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
- msrFields :+= field
- }
- }
- })
-
- msrFields
- }
-
- /**
- * Extract the DbName and table name.
- *
- * @param tableNameParts
- * @return
- */
- protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
- val (db, tableName) =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
- }
-
- (db, tableName)
- }
-
- protected def cleanIdentifier(ident: String): String = {
- ident match {
- case escapedIdentifier(i) => i
- case plainIdent => plainIdent
- }
- }
-
- protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
- var remainingNodes = nodeList
- val clauses = clauseNames.map { clauseName =>
- val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
- remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
- matches.tail
- } else {
- Nil
- })
- matches.headOption
- }
-
- if (remainingNodes.nonEmpty) {
- sys.error(
- s"""Unhandled clauses:
- |You are likely trying to use an unsupported carbon feature."""".stripMargin)
- }
- clauses
- }
-
- object Token {
- /** @return matches of the form (tokenName, children). */
- def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
- t match {
- case t: ASTNode =>
- CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
- Some((t.getText,
- Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
- case _ => None
- }
- }
- }
-
- /**
- * Extract the table properties token
- *
- * @param node
- * @return
- */
- protected def getProperties(node: Node): Seq[(String, String)] = {
- node match {
- case Token("TOK_TABLEPROPLIST", list) =>
- list.map {
- case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- (unquoteString(key) -> unquoteString(value))
- }
- }
- }
-
- protected def unquoteString(str: String) = {
- str match {
- case singleQuotedString(s) => s.toLowerCase()
- case doubleQuotedString(s) => s.toLowerCase()
- case other => other
- }
- }
-
protected lazy val loadDataNew: Parser[LogicalPlan] =
LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
(INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
@@ -1015,143 +338,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
isOverwrite.isDefined)
}
- private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
-
- // validate with all supported options
- val options = optionList.get.groupBy(x => x._1)
- val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
- "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
- "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
- "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
- )
- var isSupported = true
- val invalidOptions = StringBuilder.newBuilder
- options.foreach(value => {
- if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
- isSupported = false
- invalidOptions.append(value._1)
- }
-
- }
- )
- if (!isSupported) {
- val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- // COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
- if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
- options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
- val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
- " in options"
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
- val maxColumns: String = options.get("maxcolumns").get(0)._2
- try {
- maxColumns.toInt
- } catch {
- case ex: NumberFormatException =>
- throw new MalformedCarbonCommandException(
- "option MAXCOLUMNS can only contain integer values")
- }
- }
-
- // check for duplicate options
- val duplicateOptions = options filter {
- case (_, optionlist) => optionlist.size > 1
- }
- val duplicates = StringBuilder.newBuilder
- if (duplicateOptions.nonEmpty) {
- duplicateOptions.foreach(x => {
- duplicates.append(x._1)
- }
- )
- val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
- throw new MalformedCarbonCommandException(errorMessage)
- }
- }
-
- protected lazy val dbTableIdentifier: Parser[Seq[String]] =
- (ident <~ ".").? ~ (ident) ^^ {
- case databaseName ~ tableName =>
- if (databaseName.isDefined) {
- Seq(databaseName.get, tableName)
- } else {
- Seq(tableName)
- }
- }
-
- protected lazy val loadOptions: Parser[(String, String)] =
- (stringLit <~ "=") ~ stringLit ^^ {
- case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
- case _ => ("", "")
- }
-
-
- protected lazy val dimCol: Parser[Field] = anyFieldDef
-
- protected lazy val primitiveTypes =
- STRING ^^^ "string" | INTEGER ^^^ "integer" |
- TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
- BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
- INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
-
- /**
- * Matching the decimal(10,0) data type and returning the same.
- */
- private lazy val charType =
- CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
- case char ~ digit =>
- s"$char($digit)"
- }
-
- /**
- * Matching the decimal(10,0) data type and returning the same.
- */
- private lazy val decimalType =
- DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
- case decimal ~ precision ~ scale =>
- s"$decimal($precision, $scale)"
- }
-
- protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
- primitiveFieldType
-
- protected lazy val anyFieldDef: Parser[Field] =
- (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
- case e1 ~ e2 ~ e3 =>
- Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
- }
-
- protected lazy val primitiveFieldType: Parser[Field] =
- (primitiveTypes) ^^ {
- case e1 =>
- Field("unknown", Some(e1), Some("unknown"), Some(null))
- }
-
- protected lazy val arrayFieldType: Parser[Field] =
- ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
- case e1 =>
- Field("unknown", Some("array"), Some("unknown"),
- Some(List(Field("val", e1.dataType, Some("val"),
- e1.children))))
- }
-
- protected lazy val structFieldType: Parser[Field] =
- ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
- case e1 =>
- Field("unknown", Some("struct"), Some("unknown"), Some(e1))
- }
-
- protected lazy val measureCol: Parser[Field] =
- (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
- BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
- (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
- case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
- }
-
protected lazy val describeTable: Parser[LogicalPlan] =
((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
case ef ~ db ~ tbl =>
@@ -1169,109 +355,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}
}
- private def normalizeType(field: Field): Field = {
- val dataType = field.dataType.getOrElse("NIL")
- dataType match {
- case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "smallint" => Field(field.column, Some("SmallInt"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "array" => Field(field.column, Some("Array"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "struct" => Field(field.column, Some("Struct"), field.name,
- field.children.map(f => f.map(normalizeType(_))),
- field.parent, field.storeType, field.schemaOrdinal
- )
- case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal
- )
- case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
- field.storeType, field.schemaOrdinal, field.precision, field.scale
- )
- // checking if the nested data type contains the child type as decimal(10,0),
- // if it is present then extracting the precision and scale. resetting the data type
- // with Decimal.
- case _ if (dataType.startsWith("decimal")) =>
- val (precision, scale) = getScaleAndPrecision(dataType)
- Field(field.column,
- Some("Decimal"),
- field.name,
- Some(null),
- field.parent,
- field.storeType, field.schemaOrdinal, precision,
- scale
- )
- case _ =>
- field
- }
- }
-
- private def addParent(field: Field): Field = {
- field.dataType.getOrElse("NIL") match {
- case "Array" => Field(field.column, Some("Array"), field.name,
- field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
- field.storeType, field.schemaOrdinal)
- case "Struct" => Field(field.column, Some("Struct"), field.name,
- field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
- field.storeType, field.schemaOrdinal)
- case _ => field
- }
- }
-
- private def appendParentForEachChild(field: Field, parentName: String): Field = {
- field.dataType.getOrElse("NIL") match {
- case "String" => Field(parentName + "." + field.column, Some("String"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Long" => Field(parentName + "." + field.column, Some("Long"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Double" => Field(parentName + "." + field.column, Some("Double"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Array" => Field(parentName + "." + field.column, Some("Array"),
- Some(parentName + "." + field.name.getOrElse(None)),
- field.children
- .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
- parentName)
- case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
- Some(parentName + "." + field.name.getOrElse(None)),
- field.children
- .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
- parentName)
- case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
- case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
- Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
- field.storeType, field.schemaOrdinal, field.precision, field.scale)
- case _ => field
- }
- }
-
protected lazy val showLoads: Parser[LogicalPlan] =
SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
(LIMIT ~> numericLit).? <~
@@ -1280,13 +363,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
}
- protected lazy val segmentId: Parser[String] =
- numericLit ^^ { u => u } |
- elem("decimal", p => {
- p.getClass.getSimpleName.equals("FloatLit") ||
- p.getClass.getSimpleName.equals("DecimalLit")
- }) ^^ (_.chars)
-
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
(ident <~ ".").? ~ ident) <~
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d82290e..8ad1203 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -139,7 +139,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
- val tableInfo: TableInfo = TableNewProcessor(cm, sqlContext)
+ val tableInfo: TableInfo = TableNewProcessor(cm)
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 88e43fd..2943a52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types.{StringType, TimestampType}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
@@ -39,3 +42,34 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
+object getDB {
+
+ def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
+ dbName.getOrElse(
+ sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
+ }
+}
+
+/**
+ * Shows Loads in a table
+ */
+case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
+ extends Command {
+
+ override def output: Seq[Attribute] = {
+ Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+ AttributeReference("Status", StringType, nullable = false)(),
+ AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+ AttributeReference("Load End Time", TimestampType, nullable = false)())
+ }
+}
+
+/**
+ * Describe formatted for hive table
+ */
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends Command {
+
+ override def output: Seq[AttributeReference] =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1fa710e..976a1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -41,13 +41,13 @@ object CarbonEnv {
var initialized = false
- def init(sqlContext: SQLContext): Unit = {
+ def init(sparkSession: SparkSession): Unit = {
if (!initialized) {
val catalog = {
val storePath =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
LOGGER.info(s"carbon env initial: $storePath")
- new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+ new CarbonMetastore(sparkSession.conf, storePath)
}
carbonEnv = CarbonEnv(catalog)
initialized = true
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
new file mode 100644
index 0000000..748d292
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.internal.SessionState
+
+/**
+ * Session implementation for {org.apache.spark.sql.SparkSession}
+ * Implemented this class only to use our own SQL DDL commands.
+ * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
+ * @param sc
+ */
+class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+
+ CarbonEnv.init(this)
+
+ @transient
+ override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+}
+
+object CarbonSession {
+
+ implicit class CarbonBuilder(builder: Builder) {
+
+
+ def getOrCreateCarbonSession(): SparkSession = synchronized {
+
+ val options =
+ getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+ val userSuppliedContext: Option[SparkContext] =
+ getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]]
+
+ // Get the session from current thread's active session.
+ var session: SparkSession = SparkSession.getActiveSession match {
+ case Some(session) =>
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ session
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+
+ // Global synchronization so we will only set the default session once.
+ SparkSession.synchronized {
+ // If the current thread does not have an active session, get it from the global session.
+ session = SparkSession.getDefaultSession match {
+ case Some(session) =>
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ session
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+
+ // No active nor global default session. Create a new one.
+ val sparkContext = userSuppliedContext.getOrElse {
+ // set app name if not given
+ val randomAppName = java.util.UUID.randomUUID().toString
+ val sparkConf = new SparkConf()
+ options.foreach { case (k, v) => sparkConf.set(k, v) }
+ if (!sparkConf.contains("spark.app.name")) {
+ sparkConf.setAppName(randomAppName)
+ }
+ val sc = SparkContext.getOrCreate(sparkConf)
+ // maybe this is an existing SparkContext, update its SparkConf which maybe used
+ // by SparkSession
+ options.foreach { case (k, v) => sc.conf.set(k, v) }
+ if (!sc.conf.contains("spark.app.name")) {
+ sc.conf.setAppName(randomAppName)
+ }
+ sc
+ }
+ session = new CarbonSession(sparkContext)
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ SparkSession.setDefaultSession(session)
+
+ // Register a successfully instantiated context to the singleton. This should be at the
+ // end of the class definition so that the singleton is updated only if there is no
+ // exception in the construction of the instance.
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ }
+
+ return session
+ }
+
+ /**
+ * It is a hack to get the private field from class.
+ */
+ def getValue(name: String, builder: Builder): Any = {
+ val currentMirror = scala.reflect.runtime.currentMirror
+ val instanceMirror = currentMirror.reflect(builder)
+ val m = currentMirror.classSymbol(builder.getClass).
+ toType.members.find { p =>
+ p.name.toString.equals(name)
+ }.get.asTerm
+ instanceMirror.reflectField(m).get
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b639ea8..c78ddf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -48,7 +48,7 @@ class CarbonSource extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- CarbonEnv.init(sqlContext)
+ CarbonEnv.init(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -88,7 +88,7 @@ class CarbonSource extends CreatableRelationProvider
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
- CarbonEnv.init(sqlContext)
+ CarbonEnv.init(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -97,8 +97,10 @@ class CarbonSource extends CreatableRelationProvider
}
private def addLateDecodeOptimization(ss: SparkSession): Unit = {
- ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
- ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ if (ss.sessionState.experimentalMethods.extraStrategies.isEmpty) {
+ ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+ ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ }
}
private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
@@ -131,7 +133,7 @@ class CarbonSource extends CreatableRelationProvider
val map = scala.collection.mutable.Map[String, String]();
parameters.foreach { x => map.put(x._1, x._2) }
val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
- CreateTable(cm).run(sparkSession)
+ CreateTable(cm, false).run(sparkSession)
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 5508a94..51b79c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -31,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.{AtomicType, IntegerType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
new file mode 100644
index 0000000..5211f1a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+ if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(LoadTable(identifier.database, identifier.table, path, Seq(),
+ Map(), isOverwrite)) :: Nil
+ case DropTableCommand(identifier, ifNotExists, isView)
+ if CarbonEnv.get.carbonMetastore
+ .isTablePathExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(
+ CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
+ case ShowLoadsCommand(databaseName, table, limit) =>
+ ExecutedCommandExec(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
+ case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+ CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+ ExecutedCommandExec(createDb) :: Nil
+ case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+ if (isCascade) {
+ val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+ .filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
+ tablesInDB.foreach{tableName =>
+ CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
+ }
+ }
+ CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+ ExecutedCommandExec(drop) :: Nil
+ case alterTable@AlterTableCompaction(altertablemodel) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(TableIdentifier(altertablemodel.tableName,
+ altertablemodel.dbName))(sparkSession)
+ if (isCarbonTable) {
+ if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+ altertablemodel.compactionType.equalsIgnoreCase("major")) {
+ ExecutedCommandExec(alterTable) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table")
+ }
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+ if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+ val resolvedTable =
+ sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+ val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+ ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+ case _ => Nil
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 68ad4d6..8f97961 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,18 +17,27 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+import java.text.SimpleDateFormat
+
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
+import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.carbon.path.CarbonStorePath
@@ -36,19 +45,34 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
+object Checker {
+ def validateTableExists(
+ dbName: Option[String],
+ tableName: String,
+ session: SparkSession): Unit = {
+ val identifier = TableIdentifier(tableName, dbName)
+ if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+ val err = s"table $dbName.$tableName not found"
+ LogServiceFactory.getLogService(this.getClass.getName).error(err)
+ throw new IllegalArgumentException(err)
+ }
+ }
+}
+
/**
* Command for the compaction in alter table command
*
* @param alterTableModel
*/
-case class AlterTableCompaction(alterTableModel: AlterTableModel) {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -109,24 +133,59 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
}
}
-case class CreateTable(cm: TableModel) {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
- cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+ CarbonEnv.init(sparkSession)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
val tbName = cm.tableName
val dbName = cm.databaseName
LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
- val tableInfo: TableInfo = TableNewProcessor(cm, sparkSession.sqlContext)
+ val tableInfo: TableInfo = TableNewProcessor(cm)
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
}
- // Add Database to catalog and persist
- val catalog = CarbonEnv.get.carbonMetastore
- val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
- LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tbName))) {
+ if (!cm.ifNotExistsSet) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+ s"Table [$tbName] already exists under database [$dbName]")
+ sys.error(s"Table [$tbName] already exists under database [$dbName]")
+ }
+ } else {
+ // Add Database to catalog and persist
+ val catalog = CarbonEnv.get.carbonMetastore
+ // Need to fill partitioner class when we support partition
+ val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+ if (createDSTable) {
+ try {
+ sparkSession.sql(
+ s"""CREATE TABLE $dbName.$tbName
+ |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+ } catch {
+ case e: Exception =>
+ val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+ // call the drop table to delete the created table.
+
+ CarbonEnv.get.carbonMetastore
+ .dropTable(catalog.storePath, identifier)(sparkSession)
+
+ LOGGER.audit(s"Table creation with Database name [$dbName] " +
+ s"and Table name [$tbName] failed")
+ throw e
+ }
+ }
+
+ LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+ }
+
Seq.empty
}
@@ -136,6 +195,54 @@ case class CreateTable(cm: TableModel) {
}
}
+case class DeleteLoadsById(
+ loadids: Seq[String],
+ databaseNameOp: Option[String],
+ tableName: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadById(
+ loadids,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+
+ }
+
+ // validates load ids
+ private def validateLoadIds: Unit = {
+ if (loadids.isEmpty) {
+ val errorMessage = "Error: Segment id(s) should not be empty."
+ throw new MalformedCarbonCommandException(errorMessage)
+
+ }
+ }
+}
+
+case class DeleteLoadsByLoadDate(
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+ }
+
+}
object LoadTable {
@@ -205,7 +312,7 @@ case class LoadTable(
options: scala.collection.immutable.Map[String, String],
isOverwriteExist: Boolean = false,
var inputSqlString: String = null,
- dataFrame: Option[DataFrame] = None) {
+ dataFrame: Option[DataFrame] = None) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -433,3 +540,191 @@ case class LoadTable(
}
}
}
+
+private[sql] case class DeleteLoadByDate(
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String) {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName
+ )
+ Seq.empty
+ }
+
+}
+
+case class CleanFiles(
+ databaseNameOp: Option[String],
+ tableName: String) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val relation = CarbonEnv.get.carbonMetastore
+ .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ CarbonStore.cleanFiles(
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ relation.asInstanceOf[CarbonRelation].tableMeta.storePath
+ )
+ Seq.empty
+ }
+}
+
+case class ShowLoads(
+ databaseNameOp: Option[String],
+ tableName: String,
+ limit: Option[String],
+ override val output: Seq[Attribute]) extends RunnableCommand {
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ CarbonStore.showSegments(
+ getDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ limit
+ )
+ }
+}
+
+case class CarbonDropTableCommand(ifExistsSet: Boolean,
+ databaseNameOp: Option[String],
+ tableName: String)
+ extends RunnableCommand {
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
+ val identifier = TableIdentifier(tableName, Option(dbName))
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
+ var isLocked = false
+ try {
+ isLocked = carbonLock.lockWithRetries()
+ if (isLocked) {
+ logInfo("Successfully able to get the lock for drop.")
+ }
+ else {
+ LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
+ sys.error("Table is locked for deletion. Please try after some time")
+ }
+ LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+ CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+ LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+ } finally {
+ if (carbonLock != null && isLocked) {
+ if (carbonLock.unlock()) {
+ logInfo("Table MetaData Unlocked Successfully after dropping the table")
+ // deleting any remaining files.
+ val metadataFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataFilePath)
+ if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
+ }
+ // delete bad record log after drop table
+ val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
+ val badLogFileType = FileFactory.getFileType(badLogPath)
+ if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
+ val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
+ CarbonUtil.deleteFoldersAndFiles(file)
+ }
+ } else {
+ logError("Unable to unlock Table MetaData")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class DescribeCommandFormatted(
+ child: SparkPlan,
+ override val output: Seq[Attribute],
+ tblIdentifier: TableIdentifier)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val relation = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val mapper = new ObjectMapper()
+ val colProps = StringBuilder.newBuilder
+ var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+ val comment = if (relation.metaData.dims.contains(field.name)) {
+ val dimension = relation.metaData.carbonTable.getDimensionByName(
+ relation.tableMeta.carbonTableIdentifier.getTableName,
+ field.name)
+ if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+ val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+ colProps.append(field.name).append(".")
+ .append(mapper.writeValueAsString(dimension.getColumnProperties))
+ .append(",")
+ }
+ if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+ !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ "DICTIONARY, KEY COLUMN"
+ } else {
+ "KEY COLUMN"
+ }
+ } else {
+ ("MEASURE")
+ }
+ (field.name, field.dataType.simpleString, comment)
+ }
+ val colPropStr = if (colProps.toString().trim().length() > 0) {
+ // drops additional comma at end
+ colProps.toString().dropRight(1)
+ } else {
+ colProps.toString()
+ }
+ results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+ results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+ .getDatabaseName, "")
+ )
+ results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+ results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+ val carbonTable = relation.tableMeta.carbonTable
+ results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+ results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+ if (colPropStr.length() > 0) {
+ results ++= Seq((colPropStr, "", ""))
+ } else {
+ results ++= Seq(("ADAPTIVE", "", ""))
+ }
+ val dimension = carbonTable
+ .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+ results ++= getColumnGroups(dimension.asScala.toList)
+ results.map { case (name, dataType, comment) =>
+ Row(f"$name%-36s $dataType%-80s $comment%-72s")
+ }
+ }
+
+ private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+ var results: Seq[(String, String, String)] =
+ Seq(("", "", ""), ("##Column Group Information", "", ""))
+ val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+ case (groupId, _) => groupId != -1
+ }.toSeq.sortBy(_._1)
+ val groups = groupedDimensions.map(colGroups => {
+ colGroups._2.map(dim => dim.getColName).mkString(", ")
+ })
+ var index = 1
+ groups.foreach { x =>
+ results = results :+ (s"Column Group $index", x, "")
+ index = index + 1
+ }
+ results
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9638b8f..f174126 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -187,6 +187,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
tables.nonEmpty
}
+ def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+ checkSchemasModifiedTimeAndReloadTables()
+ val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val tables = metadata.tablesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ tables.nonEmpty
+ }
+
def loadMetadata(metadataPath: String, queryId: String): MetaData = {
val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
val statistic = new QueryStatistic()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..066acce
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+
+ experimentalMethods.extraStrategies =
+ Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+ experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
new file mode 100644
index 0000000..028286c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.ShowLoadsCommand
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command._
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL, DML cases in Unified context
+ */
+class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
+
+ override def parse(input: String): LogicalPlan = {
+ synchronized {
+ // Initialize the Keywords.
+ initLexical
+ phrase(start)(new lexical.Scanner(input)) match {
+ case Success(plan, _) => plan match {
+ case x: LoadTable =>
+ x.inputSqlString = input
+ x
+ case logicalPlan => logicalPlan
+ }
+ case failureOrError => sys.error(failureOrError.toString)
+ }
+ }
+ }
+
+
+ protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
+
+ protected lazy val startCommand: Parser[LogicalPlan] =
+ loadManagement| showLoads | alterTable
+
+ protected lazy val loadManagement: Parser[LogicalPlan] =
+ deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+
+
+ protected lazy val alterTable: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ {
+ case dbName ~ table ~ (compact ~ compactType) =>
+ val altertablemodel = AlterTableModel(dbName, table, compactType, null)
+ AlterTableCompaction(altertablemodel)
+ }
+
+
+ protected lazy val loadDataNew: Parser[LogicalPlan] =
+ LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
+ (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+ case filePath ~ isOverwrite ~ table ~ optionsList =>
+ val (databaseNameOp, tableName) = table match {
+ case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+ }
+ if (optionsList.isDefined) {
+ validateOptions(optionsList)
+ }
+ val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+ LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+ isOverwrite.isDefined)
+ }
+
+ protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
+ DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+ (ident <~ ".").? ~ ident) <~
+ opt(";") ^^ {
+ case loadids ~ table => table match {
+ case databaseName ~ tableName =>
+ DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
+ }
+ }
+
+ protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
+ DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+ opt(";") ^^ {
+ case schema ~ table ~ condition =>
+ condition match {
+ case dateField ~ dateValue =>
+ DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
+ }
+ }
+
+ protected lazy val cleanFiles: Parser[LogicalPlan] =
+ CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
+ case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
+ }
+
+ protected lazy val explainPlan: Parser[LogicalPlan] =
+ (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+ case isExtended ~ logicalPlan =>
+ logicalPlan match {
+ case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+ case _ => ExplainCommand(OneRowRelation)
+ }
+ }
+
+ protected lazy val showLoads: Parser[LogicalPlan] =
+ SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (LIMIT ~> numericLit).? <~
+ opt(";") ^^ {
+ case databaseName ~ tableName ~ limit =>
+ ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
+ }
+}