You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/12 18:27:39 UTC
[1/2] incubator-carbondata git commit: Added DatasourceRegister to
make format name simple and clean up some unused code
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 367b9b2d2 -> 90139be56
Added DatasourceRegister to make format name simple and clean up some unused code
Simplified the storage name
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8f9f9f44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8f9f9f44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8f9f9f44
Branch: refs/heads/master
Commit: 8f9f9f44e703fc09586696184f7efa24cdd9346d
Parents: 367b9b2
Author: ravipesala <ra...@gmail.com>
Authored: Fri Jul 8 18:22:46 2016 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Jul 12 23:55:53 2016 +0530
----------------------------------------------------------------------
assembly/pom.xml | 3 +
.../org/carbondata/examples/CarbonExample.scala | 2 +-
.../examples/DataFrameAPIExample.scala | 9 +-
integration/spark/pom.xml | 3 +
.../spark/sql/CarbonCatalystOperators.scala | 90 ---
.../org/apache/spark/sql/CarbonContext.scala | 6 +-
.../spark/sql/CarbonDatasourceRelation.scala | 9 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 20 +-
.../execution/command/carbonTableSchema.scala | 541 +------------------
.../spark/sql/hive/CarbonStrategies.scala | 10 -
....apache.spark.sql.sources.DataSourceRegister | 1 +
11 files changed, 28 insertions(+), 666 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ef7d5d9..e41b99a 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -112,6 +112,9 @@
<Compile-Timestamp>2015-07-15 02.59.16</Compile-Timestamp>
</manifestEntries>
</transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
+ </transformer>
</transformers>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/examples/src/main/scala/org/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/carbondata/examples/CarbonExample.scala
index 20b2eae..dc8a91e 100644
--- a/examples/src/main/scala/org/carbondata/examples/CarbonExample.scala
+++ b/examples/src/main/scala/org/carbondata/examples/CarbonExample.scala
@@ -36,7 +36,7 @@ object CarbonExample {
CREATE TABLE IF NOT EXISTS t3
(ID Int, date Timestamp, country String,
name String, phonetype String, serialname String, salary Int)
- STORED BY 'org.apache.carbondata.format'
+ STORED BY 'carbondata'
""")
cc.sql(s"""
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/examples/src/main/scala/org/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/carbondata/examples/DataFrameAPIExample.scala
index 75c1aa7..6e412e5 100644
--- a/examples/src/main/scala/org/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/src/main/scala/org/carbondata/examples/DataFrameAPIExample.scala
@@ -17,10 +17,7 @@
package org.carbondata.examples
-import java.io.File
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{CarbonContext, SaveMode}
+import org.apache.spark.sql.SaveMode
import org.carbondata.examples.util.InitForExamples
@@ -38,14 +35,14 @@ object DataFrameAPIExample {
// save dataframe to carbon file
df.write
- .format("org.apache.spark.sql.CarbonSource")
+ .format("carbondata")
.option("tableName", "carbon1")
.mode(SaveMode.Overwrite)
.save()
// use datasource api to read
val in = cc.read
- .format("org.apache.spark.sql.CarbonSource")
+ .format("carbondata")
.option("tableName", "carbon1")
.load()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 143c09d..f9f023e 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -132,6 +132,9 @@
<testSourceDirectory>src/test/scala</testSourceDirectory>
<resources>
<resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
<directory>.</directory>
<includes>
<include>CARBON_SPARK_INTERFACELogResource.properties</include>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d8a273f..14c1cca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.execution.command.tableModel
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
import org.apache.spark.sql.types._
@@ -49,95 +48,6 @@ object getDB {
}
/**
- * Shows schemas
- */
-case class ShowSchemaCommand(cmd: Option[String]) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("result", StringType, nullable = false)())
- }
-}
-
-/**
- * Shows AggregateTables of a schema
- */
-case class ShowCreateCubeCommand(cm: tableModel) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("createCubeCmd", StringType, nullable = false)())
- }
-}
-
-/**
- * Shows AggregateTables of a schema
- */
-case class ShowAggregateTablesCommand(schemaNameOp: Option[String])
- extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("tableName", StringType, nullable = false)())
- }
-}
-
-/**
- * Shows cubes in schema
- */
-case class ShowCubeCommand(schemaNameOp: Option[String]) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("tableName", StringType, nullable = false)(),
- AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
- }
-}
-
-
-/**
- * Shows cubes in schema
- */
-case class ShowAllCubeCommand() extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("dbName", StringType, nullable = false)(),
- AttributeReference("tableName", StringType, nullable = false)(),
- AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
- }
-}
-
-case class SuggestAggregateCommand(
- script: Option[String],
- sugType: Option[String],
- schemaName: Option[String],
- cubeName: String) extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("SuggestionType", StringType, nullable = false)(),
- AttributeReference("Suggestion", StringType, nullable = false)())
- }
-}
-
-/**
- * Shows cubes in schema
- */
-case class ShowTablesDetailedCommand(schemaNameOp: Option[String])
- extends LogicalPlan with Command {
- override def children: Seq[LogicalPlan] = Seq.empty
-
- override def output: Seq[Attribute] = {
- Seq(AttributeReference("TABLE_CAT", StringType, nullable = true)(),
- AttributeReference("TABLE_SCHEM", StringType, nullable = false)(),
- AttributeReference("TABLE_NAME", StringType, nullable = false)(),
- AttributeReference("TABLE_TYPE", StringType, nullable = false)(),
- AttributeReference("REMARKS", StringType, nullable = false)())
- }
-}
-
-/**
* Shows Loads in a cube
*/
case class ShowLoadsCommand(schemaNameOp: Option[String], cube: String, limit: Option[String])
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 44f5be7..a317c1c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -117,6 +117,11 @@ class CarbonContext(
}
object CarbonContext {
+
+ val datasourceName: String = "org.apache.carbondata.format"
+
+ val datasourceShortName: String = "carbondata"
+
/**
* @param schemaName - Schema Name
* @param cubeName - Cube Name
@@ -187,5 +192,4 @@ object CarbonContext {
cache(sc) = cc
}
- def datasourceName: String = "org.apache.carbondata.format"
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 2412e56..5613d71 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -42,7 +42,12 @@ import org.carbondata.spark.{CarbonOption, _}
* Creates carbon relations
*/
class CarbonSource
- extends RelationProvider with CreatableRelationProvider with HadoopFsRelationProvider {
+ extends RelationProvider
+ with CreatableRelationProvider
+ with HadoopFsRelationProvider
+ with DataSourceRegister {
+
+ override def shortName(): String = "carbondata"
/**
* Returns a new base relation with the given parameters.
@@ -92,7 +97,7 @@ class CarbonSource
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
val cc = CarbonContext.getInstance(sqlContext.sparkContext)
- cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
+ cc.sql(s"DROP CUBE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index b0870d3..1694a7e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -56,7 +56,6 @@ class CarbonSqlParser()
protected val CLASS = Keyword("CLASS")
protected val CLEAN = Keyword("CLEAN")
protected val COLUMNS = Keyword("COLUMNS")
- protected val CUBES = Keyword("CUBES")
protected val DATA = Keyword("DATA")
protected val DELETE = Keyword("DELETE")
protected val DESCRIBE = Keyword("DESCRIBE")
@@ -355,7 +354,8 @@ class CarbonSqlParser()
case _ => // Unsupport features
}
- if (!storedBy.equals(CarbonContext.datasourceName)) {
+ if (!(storedBy.equals(CarbonContext.datasourceName) ||
+ storedBy.equals(CarbonContext.datasourceShortName))) {
// TODO: should execute by Hive instead of error
sys.error("Not a carbon format request")
}
@@ -370,7 +370,7 @@ class CarbonSqlParser()
tableProperties)
// get logical plan.
- CreateCube(tableModel)
+ CreateTable(tableModel)
}
}
@@ -959,20 +959,6 @@ class CarbonSqlParser()
case _ => ("", "")
}
- protected lazy val showCube: Parser[LogicalPlan] =
- SHOW ~> CUBES ~> (IN ~> ident).? ~ (DETAIL).? <~ opt(";") ^^ {
- case schema ~ detail =>
- if (detail.isDefined) {
- ShowTablesDetailedCommand(schema)
- } else {
- ShowCubeCommand(schema)
- }
- }
- protected lazy val showAllCubes: Parser[LogicalPlan] =
- SHOW ~> ALL ~> CUBES <~ opt(";") ^^ {
- case _ => ShowAllCubeCommand()
- }
-
protected lazy val dimCol: Parser[Field] = anyFieldDef
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 826ac7a..7ae6831 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -733,435 +733,6 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
}
-private[sql] case class ShowCreateTable(cm: tableModel, override val output: Seq[Attribute])
- extends RunnableCommand {
-
- val numericTypes = Seq(CarbonCommonConstants.INTEGER_TYPE, CarbonCommonConstants.DOUBLE_TYPE,
- CarbonCommonConstants.LONG_TYPE, CarbonCommonConstants.FLOAT_TYPE)
-
- def run(sqlContext: SQLContext): Seq[Row] = {
-
- var levels = Seq[Level]()
- var levelsToCheckDuplicate = Seq[Level]()
- var measures = Seq[Measure]()
- var dimFileDimensions = Seq[Dimension]()
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- var command = new StringBuilder
- var relation = new StringBuilder
-
- cm.schemaName = getDB.getDatabaseName(cm.schemaNameOp, sqlContext)
-
- command = command.append("CREATE Table ").append(cm.schemaName).append(".").append(cm.cubeName)
- .append(" ")
- relation = relation.append("")
-
- if (cm.fromKeyword.equalsIgnoreCase(CarbonCommonConstants.FROM)) {
- val df = getDataFrame(cm.source, sqlContext)
-
- // Will maintain the list of all the columns specified by the user.
- // In case if relation is defined. we need to retain the mapping column
- // in case if its in this list
- var specifiedCols = Seq[String]()
-
- // For filtering INCLUDE and EXCLUDE fields defined for Measures and Dimensions
- def filterIncludeCols(p: (String, String), fCols: FilterCols): Boolean = {
- if (fCols.includeKey.equalsIgnoreCase(CarbonCommonConstants.INCLUDE)) {
- fCols.fieldList.map(x => x.toLowerCase()).contains(p._1.toLowerCase())
- } else {
- !fCols.fieldList.map(x => x.toLowerCase()).contains(p._1.toLowerCase())
- }
- }
-
- // For filtering the fields defined in Measures and Dimensions fields
- def filterDefinedCols(p: (String, String), definedCols: Seq[Field]) = {
- var isDefined = false
- definedCols.foreach(f => {
- if (f.dataType.isDefined) {
- sys.error(
- s"Specifying Data types is not supported for the fields " +
- s"in the DDL with CSV file or Table : [$f]")
- }
- if (f.column.equalsIgnoreCase(p._1)) {
- isDefined = true
- }
- })
- isDefined
- }
-
- val rawColumns = if (cm.factFieldsList.isDefined) {
- val cols = df.dtypes.map(f => (f._1.trim(), f._2))
- .filter(filterIncludeCols(_, cm.factFieldsList.get))
- specifiedCols = cols.map(_._1)
- cols
- } else {
- df.dtypes.map(f =>
- if (f._2.startsWith("ArrayType") || f._2.startsWith("StructType")) {
- val fieldIndex = df.schema.getFieldIndex(f._1).get
- (f._1.trim(), df.schema.fields(fieldIndex).dataType.simpleString)
- }
- else {
- (f._1.trim(), f._2)
- })
- }
-
- val columns = rawColumns
- .filter(c => !c._2.equalsIgnoreCase(CarbonCommonConstants.BINARY_TYPE))
- if (rawColumns.length > columns.length) {
- LOGGER
- .info("BinaryType is not supported. Ignoring all the Binary fields.")
- }
-
- val (numericColArray, nonNumericColArray) = columns
- .partition(p => numericTypes.map(x => x.toLowerCase()).contains(p._2.toLowerCase()))
-
- // If dimensions are defined along with Fact CSV/table, consider only defined dimensions
- val dimColArray = if (cm.dimCols.nonEmpty) {
- val dcolArray = columns.filter(filterDefinedCols(_, cm.dimCols))
- val listedCols = dcolArray.map(_._1)
- specifiedCols = specifiedCols ++ listedCols
- dcolArray
- } else {
- nonNumericColArray
- }
-
- // If measures are defined along with Fact CSV/table, consider only defined measures
- val measureColArray = if (cm.msrCols.nonEmpty) {
- val mColArray = columns.filter(filterDefinedCols(_, cm.msrCols))
- val listedCols = mColArray.map(_._1)
- specifiedCols = specifiedCols ++ listedCols
- mColArray
- } else {
- if (cm.dimCols.nonEmpty) {
- numericColArray.filterNot(filterDefinedCols(_, cm.dimCols))
- } else {
- numericColArray
- }
- }
-
- measures = measureColArray.map(field => {
- if (cm.msrCols.nonEmpty) {
- val definedField = cm.msrCols.filter(f => f.column.equalsIgnoreCase(field._1))
- if (definedField.nonEmpty && definedField.head.name.isDefined) {
- Measure(
- definedField.head.name.getOrElse(field._1), field._1,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- else {
- Measure(field._1, field._1,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- }
- else {
- Measure(field._1, field._1,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- })
-
- levels = dimColArray.map(field => {
- if (cm.dimCols.nonEmpty) {
- val definedField = cm.dimCols.filter(f => f.column.equalsIgnoreCase(field._1))
- if (definedField.nonEmpty && definedField.head.name.isDefined) {
- Level(
- definedField.head.name.getOrElse(field._1), field._1, Int.MaxValue,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- else {
- Level(field._1, field._1, Int.MaxValue,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- }
- else {
- Level(field._1, field._1, Int.MaxValue,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- })
-
- if (cm.dimRelations.nonEmpty) {
- cm.dimRelations.foreach(relationEntry => {
-
- val relDf = getDataFrame(relationEntry.dimSource, sqlContext)
-
- var right = false
-
- for (field <- relDf.columns.map(f => f.trim())) {
- if (field.equalsIgnoreCase(relationEntry.relation.rightColumn)) {
- right = true
- }
- }
-
- if (!right) {
- val rcl = relationEntry.relation.rightColumn
- LOGGER.error(s"Dimension field defined in the relation [$rcl] " +
- s"is not present in the Dimension source")
- sys.error(
- s"Dimension field defined in the relation [$rcl] " +
- s"is not present in the Dimension source")
- }
-
- val rawRelColumns = if (relationEntry.cols.isDefined) {
- relDf.dtypes.map(f => (f._1.trim(), f._2))
- .filter(filterRelIncludeCols(relationEntry, _))
- } else {
- relDf.dtypes.map(f => (f._1.trim(), f._2))
- }
-
- val relColumns = rawRelColumns
- .filter(c => !c._2.equalsIgnoreCase(CarbonCommonConstants.BINARY_TYPE))
- if (rawRelColumns.length > relColumns.length) {
- LOGGER
- .info("BinaryType is not supported. Ignoring all the Binary fields.")
- }
-
- // Remove the relation column from fact table as it
- // is already considered in dimension table
- levels = levels.dropWhile(
- p => p.column.equalsIgnoreCase(relationEntry.relation.leftColumn) &&
- !specifiedCols.map(x => x.toLowerCase()).contains(p.column.toLowerCase()))
- measures = measures.dropWhile(
- p => p.column.equalsIgnoreCase(relationEntry.relation.leftColumn) &&
- !specifiedCols.map(x => x.toLowerCase()).contains(p.column.toLowerCase()))
-
- val dimFileLevels: Seq[Level] = Seq[Level]()
- relColumns.map(field => {
- Level(field._1, field._1, Int.MaxValue,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(field._2))
- }
- )
- val dimFileHierarchies = dimFileLevels.map(field => Hierarchy(relationEntry.tableName,
- Some(
- dimFileLevels.find(dl => dl.name.equalsIgnoreCase(relationEntry.relation.rightColumn))
- .get.column), Seq(field), Some(relationEntry.tableName)))
- dimFileDimensions = dimFileDimensions ++ dimFileHierarchies.map(
- field => Dimension(field.levels.head.name, Seq(field),
- Some(relationEntry.relation.leftColumn)))
-
- levelsToCheckDuplicate = levelsToCheckDuplicate ++ dimFileLevels
-
- if (relation.nonEmpty) {
- relation = relation.append(", ")
- }
- relation = relation.append(relationEntry.tableName).append(" RELATION (FACT.")
- .append(relationEntry.relation.leftColumn).append("=")
- .append(relationEntry.relation.rightColumn).append(") INCLUDE (")
-
- val includeFields = relColumns.map(field => field._1)
- relation = relation.append(includeFields.mkString(", ")).append(")")
-
- })
- }
-
- levelsToCheckDuplicate = levelsToCheckDuplicate ++ levels
- }
- else {
- // Create Table DDL with Database defination
- levels = cm.dimCols.map(
- field => Level(field.name.getOrElse(field.column), field.column, Int.MaxValue,
- field.dataType.getOrElse(CarbonCommonConstants.STRING)))
- measures = cm.msrCols.map(field => Measure(field.name.getOrElse(field.column), field.column,
- field.dataType.getOrElse(CarbonCommonConstants.NUMERIC)))
- levelsToCheckDuplicate = levels
-
- if (cm.withKeyword.equalsIgnoreCase(CarbonCommonConstants.WITH) &&
- cm.simpleDimRelations.nonEmpty) {
- cm.simpleDimRelations.foreach(relationEntry => {
-
- val split = levels.partition(x => relationEntry.cols.get.contains(x.name))
- val dimFileLevels = split._1
-
- if (
- dimFileLevels.count(l => l.name.equalsIgnoreCase(relationEntry.relation.rightColumn)) <=
- 0) {
- val rcl = relationEntry.relation.rightColumn
- LOGGER.error(s"Dimension field defined in the relation [$rcl] " +
- s"is not present in the Dimension source")
- sys.error(
- s"Dimension field defined in the relation [$rcl] " +
- s"is not present in the Dimension source")
- }
-
- val dimFileHierarchies = dimFileLevels.map(field => Hierarchy(relationEntry.tableName,
- Some(
- dimFileLevels.find(dl => dl.name.equalsIgnoreCase(relationEntry.relation.rightColumn))
- .get.column), Seq(field), Some(relationEntry.tableName)))
- dimFileDimensions = dimFileDimensions ++ dimFileHierarchies.map(
- field => Dimension(field.levels.head.name, Seq(field),
- Some(relationEntry.relation.leftColumn)))
-
- if (relation.nonEmpty) {
- relation = relation.append(", ")
- }
- relation = relation.append(relationEntry.tableName).append(" RELATION (FACT.")
- .append(relationEntry.relation.leftColumn).append("=")
- .append(relationEntry.relation.rightColumn).append(") INCLUDE (")
- relation = relation.append(relationEntry.cols.get.mkString(", ")).append(")")
- })
-
- }
- }
-
- // Check if there is any duplicate measures or dimensions.
- // Its based on the dimension name and measure name
- levelsToCheckDuplicate.groupBy(_.name).foreach(f => if (f._2.size > 1) {
- val name = f._1
- LOGGER.error(s"Duplicate dimensions found with name : $name")
- sys.error(s"Duplicate dimensions found with name : $name")
- })
-
- measures.groupBy(_.name).foreach(f => if (f._2.size > 1) {
- val name = f._1
- LOGGER.error(s"Duplicate measures found with name : $name")
- sys.error(s"Duplicate measures found with name : $name")
- })
-
- val levelsArray = levelsToCheckDuplicate.map(_.name)
- val levelsNdMesures = levelsArray ++ measures.map(_.name)
-
- cm.aggregation.foreach(a => {
- if (levelsArray.contains(a.msrName)) {
- val fault = a.msrName
- LOGGER.error(s"Aggregator should not be defined for dimension fields [$fault]")
- sys.error(s"Aggregator should not be defined for dimension fields [$fault]")
- }
- })
-
- levelsNdMesures.groupBy(x => x).foreach(f => if (f._2.size > 1) {
- val name = f._1
- LOGGER.error(s"Dimension and Measure defined with same name : $name")
- sys.error(s"Dimension and Measure defined with same name : $name")
- })
-
- if (levelsArray.size <= 0) {
- sys.error("No Dimensions defined. Table should have atleast one dimesnion !")
- }
-
- val dims = levelsToCheckDuplicate.map(l => l.name + " " + l.dataType)
- command = command.append("DIMENSIONS (").append(dims.mkString(", ")).append(") ")
-
- if (measures.nonEmpty) {
- val mesrs = measures.map(m => m.name + " " + m.dataType)
- command = command.append("MEASURES (").append(mesrs.mkString(", ")).append(")")
- }
-
- if (relation.nonEmpty) {
- command = command.append(" WITH ").append(relation)
- }
-
- if (cm.aggregation.nonEmpty || cm.partitioner.isDefined) {
- command = command.append(" OPTIONS( ")
-
- if (cm.aggregation.nonEmpty) {
- val aggs = cm.aggregation.map(a => a.msrName + "=" + a.aggType)
- command = command.append("AGGREGATION[ ").append(aggs.mkString(", ")).append(" ] ")
- if (cm.partitioner.isDefined) {
- command = command.append(", ")
- }
- }
-
- if (cm.partitioner.isDefined) {
- val partn = cm.partitioner.get
- command = command.append("PARTITIONER[ CLASS='").append(partn.partitionClass)
- .append("', COLUMNS=(").append(partn.partitionColumn.mkString(", "))
- .append("), PARTITION_COUNT=").append(partn.partitionCount).append(" ]")
- }
-
- command = command.append(" )")
- }
-
- command = command.append(";")
-
- val hierarchies = levels.map(field => Hierarchy(field.name, None, Seq(field), None))
- var dimensions = hierarchies.map(field => Dimension(field.name, Seq(field), None))
- dimensions = dimensions ++ dimFileDimensions
-
- cm.partitioner match {
- case Some(part: Partitioner) =>
- var definedpartCols = part.partitionColumn
- val columnBuffer = new ArrayBuffer[String]
- part.partitionColumn.foreach { col =>
- dimensions.foreach { dim =>
- dim.hierarchies.foreach { hier =>
- hier.levels.foreach { lev =>
- if (lev.name.equalsIgnoreCase(col)) {
- definedpartCols = definedpartCols.dropWhile(c => c.equals(col))
- columnBuffer += lev.name
- }
- }
- }
- }
- }
-
- try {
- Class.forName(part.partitionClass).newInstance()
- } catch {
- case e: Exception =>
- val cl = part.partitionClass
- sys.error(s"partition class specified can not be found or loaded : $cl")
- }
-
- if (definedpartCols.nonEmpty) {
- val msg = definedpartCols.mkString(", ")
- LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
- sys.error(s"partition columns specified are not part of Dimension columns : $msg")
- }
-
- case None =>
- }
- Seq(Row(command.toString))
- }
-
- def getDataFrame(factSource: Object, sqlContext: SQLContext): DataFrame = {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- var dataFrame: DataFrame = null
-
- factSource match {
- case factFile: String =>
- val fileType = FileFactory.getFileType(factFile)
-
- if (FileFactory.isFileExist(factFile, fileType)) {
- dataFrame = sqlContext.read.format("com.databricks.spark.csv").options(
- Map("path" -> factFile, "header" -> "true", "inferSchema" -> "true")).load(factFile)
- }
- else {
- LOGGER.error(s"Input source file $factFile does not exists")
- sys.error(s"Input source file $factFile does not exists")
- }
- case tableInfo: Seq[String] =>
- val dbName = if (tableInfo.size > 1) {
- tableInfo.head
- } else {
- getDB.getDatabaseName(None, sqlContext)
- }
- val tableName = if (tableInfo.size > 1) {
- tableInfo(1)
- } else {
- tableInfo.head
- }
-
- if (sqlContext.tableNames(dbName).map(x => x.toLowerCase())
- .contains(tableName.toLowerCase())) {
- dataFrame = DataFrame(sqlContext,
- sqlContext.catalog.lookupRelation(TableIdentifier(tableName, Some(dbName))))
- } else {
- LOGGER.error(s"Input source table $tableName does not exists")
- sys.error(s"Input source table $tableName does not exists")
- }
- }
- dataFrame
- }
-
- // For filtering INCLUDE and EXCLUDE fields if any is defined for Dimention relation
- def filterRelIncludeCols(relationEntry: DimensionRelation, p: (String, String)): Boolean = {
- if (relationEntry.includeKey.get.equalsIgnoreCase(CarbonCommonConstants.INCLUDE)) {
- relationEntry.cols.get.map(x => x.toLowerCase()).contains(p._1.toLowerCase())
- } else {
- !relationEntry.cols.get.map(x => x.toLowerCase()).contains(p._1.toLowerCase())
- }
- }
-
-}
-
-
// These are the assumptions made
// 1.We have a single hierarchy under a dimension tag and a single level under a hierarchy tag
// 2.The names of dimensions and measures are case insensitive
@@ -1245,7 +816,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
}
}
-private[sql] case class CreateCube(cm: tableModel) extends RunnableCommand {
+private[sql] case class CreateTable(cm: tableModel) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -1276,7 +847,7 @@ private[sql] case class CreateCube(cm: tableModel) extends RunnableCommand {
val tablePath = catalog.createCubeFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
try {
sqlContext.sql(
- s"""CREATE TABLE $dbName.$tbName USING org.apache.spark.sql.CarbonSource""" +
+ s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
.collect
} catch {
@@ -1614,21 +1185,6 @@ private[sql] case class LoadTable(
}
-private[sql] case class AddAggregatesToTable(
- schemaNameOp: Option[String],
- cubeName: String,
- aggregateAttributes: Seq[AggregateTableAttributes])
- extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- // TODO: Implement it
- Seq.empty
- }
-}
-
private[sql] case class PartitionData(databaseName: String, tableName: String, factPath: String,
targetPath: String, delimiter: String, quoteChar: String,
fileHeader: String, escapeChar: String, multiLine: Boolean)
@@ -1657,89 +1213,6 @@ private[sql] case class PartitionData(databaseName: String, tableName: String, f
}
}
-private[sql] case class ShowAllTablesInSchema(
- schemaNameOp: Option[String],
- override val output: Seq[Attribute]
-) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- CarbonEnv.getInstance(sqlContext).carbonCatalog.getTables(Some(dbName))(sqlContext)
- .map{x =>
- Row(x._1, sqlContext.asInstanceOf[HiveContext]
- .catalog.tableExists(TableIdentifier(x._1, Some(dbName))))
- }
- }
-}
-
-private[sql] case class ShowAllTables(override val output: Seq[Attribute])
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- CarbonEnv.getInstance(sqlContext).carbonCatalog.getAllTables()(sqlContext)
- .map { x =>
- Row(x.database.get, x.table, sqlContext.asInstanceOf[HiveContext].catalog.tableExists(x))
- }
- }
-
-}
-
-private[sql] case class ShowAllTablesDetail(
- schemaNameOp: Option[String],
- override val output: Seq[Attribute]
-) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val dSchemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- sqlContext.catalog.getTables(Some(dSchemaName))
- .map(x => Row(null, dSchemaName, x._1, "TABLE", ""))
- }
-}
-
-private[sql] case class MergeTable(dbName: String, cubeName: String, tableName: String)
- extends RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val identifier = TableIdentifier(tableName, Option(cubeName))
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$cubeName does not exist")
- }
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(cubeName)
- carbonLoadModel.setDatabaseName(dbName)
- val table = relation.cubeMeta.carbonTable
- var isTablePresent = false
- if (table.getFactTableName.equals(tableName)) {
- isTablePresent = true
- }
- if (!isTablePresent) {
- val aggTables = table.getAggregateTablesName.asScala.toArray
- var aggTable = null
- for (aggTable <- aggTables if aggTable.equals(tableName)) {
- isTablePresent = true
- }
- }
- if (!isTablePresent) {
- sys.error("Invalid table name!")
- }
- carbonLoadModel.setTableName(tableName)
- val dataLoadSchema = new CarbonDataLoadSchema(relation.cubeMeta.carbonTable)
- // Need to fill dimension relation
- // dataLoadSchema.setDimensionRelationList(x$1)
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- var storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir"))
- storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
- CarbonDataRDDFactory
- .mergeCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.cubeMeta.storePath,
- relation.cubeMeta.partitioner)
- Seq.empty
- }
-}
-
private[sql] case class DropTableCommand(ifExistsSet: Boolean, schemaNameOp: Option[String],
tableName: String)
extends RunnableCommand {
@@ -1894,16 +1367,6 @@ private[sql] case class ShowLoads(
}
-private[sql] case class ShowAggregateTables(
- schemaNameOp: Option[String],
- override val output: Seq[Attribute])
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- Seq(Row("AggTable1"), Row("AggTable2"))
- }
-}
-
private[sql] case class DescribeCommandFormatted(
child: SparkPlan,
override val output: Seq[Attribute],
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 756e088..f620f13 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -223,21 +223,11 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
object DDLStrategies extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case ShowCubeCommand(schemaName) =>
- ExecutedCommand(ShowAllTablesInSchema(schemaName, plan.output)) :: Nil
- case c@ShowAllCubeCommand() =>
- ExecutedCommand(ShowAllTables(plan.output)) :: Nil
- case ShowCreateCubeCommand(cm) =>
- ExecutedCommand(ShowCreateTable(cm, plan.output)) :: Nil
- case ShowTablesDetailedCommand(schemaName) =>
- ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil
case DropTable(tableName, ifNotExists)
if CarbonEnv.getInstance(sqlContext).carbonCatalog
.tableExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
val identifier = toTableIdentifier(tableName.toLowerCase)
ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
- case ShowAggregateTablesCommand(schemaName) =>
- ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil
case ShowLoadsCommand(schemaName, cube, limit) =>
ExecutedCommand(ShowLoads(schemaName, cube, limit, plan.output)) :: Nil
case LoadTable(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f9f9f44/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..607e962
--- /dev/null
+++ b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.CarbonSource
\ No newline at end of file
[2/2] incubator-carbondata git commit: [CARBONDATA-47]Simplified
datasource format name and storage name in carbondata This closes #32
Posted by ch...@apache.org.
[CARBONDATA-47]Simplified datasource format name and storage name in carbondata This closes #32
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/90139be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/90139be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/90139be5
Branch: refs/heads/master
Commit: 90139be56533bdc3c0a64a814cbfa9516eea51b0
Parents: 367b9b2 8f9f9f4
Author: chenliang613 <ch...@apache.org>
Authored: Tue Jul 12 23:57:24 2016 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Jul 12 23:57:24 2016 +0530
----------------------------------------------------------------------
assembly/pom.xml | 3 +
.../org/carbondata/examples/CarbonExample.scala | 2 +-
.../examples/DataFrameAPIExample.scala | 9 +-
integration/spark/pom.xml | 3 +
.../spark/sql/CarbonCatalystOperators.scala | 90 ---
.../org/apache/spark/sql/CarbonContext.scala | 6 +-
.../spark/sql/CarbonDatasourceRelation.scala | 9 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 20 +-
.../execution/command/carbonTableSchema.scala | 541 +------------------
.../spark/sql/hive/CarbonStrategies.scala | 10 -
....apache.spark.sql.sources.DataSourceRegister | 1 +
11 files changed, 28 insertions(+), 666 deletions(-)
----------------------------------------------------------------------