You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:47 UTC
[46/54] [abbrv] carbondata git commit: [CARBONDATA-1543] Supported
DataMap chooser and expression for supporting multiple datamaps in single
query
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index ccb9b68..b1962c1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,15 +19,15 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -293,7 +293,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
test("test pre agg create table 22: using invalid datamap provider") {
sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
- val e: Exception = intercept[MalformedDataMapCommandException] {
+ val e = intercept[MalformedDataMapCommandException] {
sql(
"""
| CREATE DATAMAP agg0 ON TABLE mainTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 43316b3..ec76b37 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -201,7 +201,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("Unknown datamap provider/class abc"))
+ assert(e.getMessage.equals("DataMap class 'abc' not found"))
}
test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") {
@@ -216,7 +216,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("Unknown datamap provider/class abc"))
+ assert(e.getMessage.equals("DataMap class 'abc' not found"))
}
test("test timeseries create table 13: Only one granularity level can be defined 1") {
@@ -237,6 +237,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
+ e.printStackTrace()
assert(e.getMessage.equals("Only one granularity level can be defined"))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index c522c1e..e31896f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest {
try {
val write = new PrintWriter(fileName);
for (i <- start until (start + line)) {
- write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+ write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
}
write.close()
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 693c145..a214444 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -92,18 +92,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
buildTestData
}
-test("test the boolean data type"){
- booldf.write
- .format("carbondata")
- .option("tableName", "carbon10")
- .option("tempCSV", "true")
- .option("compress", "true")
- .mode(SaveMode.Overwrite)
- .save()
- checkAnswer(
- sql("SELECT * FROM CARBON10"),
- Seq(Row("anubhav", true), Row("prince", false)))
-}
+ test("test the boolean data type"){
+ booldf.write
+ .format("carbondata")
+ .option("tableName", "carbon0")
+ .option("tempCSV", "true")
+ .option("compress", "true")
+ .mode(SaveMode.Overwrite)
+ .save()
+ checkAnswer(
+ sql("SELECT * FROM CARBON0"),
+ Seq(Row("anubhav", true), Row("prince", false)))
+ }
test("test load dataframe with saving compressed csv files") {
// save dataframe to carbon file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 1cbbcb4..041a63a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
- var dataMapName: String = _
+ var dataMapSchema: DataMapSchema = _
/**
* Initialization of Datamap factory with the identifier and datamap name
*/
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
- this.dataMapName = dataMapName
+ this.dataMapSchema = dataMapSchema
}
/**
* Return a new write for this datamap
*/
override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
+ new CGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
}
/**
@@ -140,7 +140,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
* Return metadata of this datamap
*/
override def getMeta: DataMapMeta = {
- new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
}
}
@@ -198,12 +199,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
}
- getEqualToExpression(f, buffer)
}
}
}
@@ -221,12 +226,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
segment: Segment,
dataWritePath: String,
- dataMapName: String)
+ dataMapSchema: DataMapSchema)
extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
var currentBlockId: String = null
val cgwritepath = dataWritePath + "/" +
- dataMapName + System.nanoTime() + ".datamap"
+ dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
lazy val stream: DataOutputStream = FileFactory
.getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
val blockletList = new ArrayBuffer[Array[Byte]]()
@@ -345,14 +350,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- table.getAbsoluteTableIdentifier,
- classOf[CGDataMapFactory].getName, "cgdatamap")
+ sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
sql("select * from normal_test where name='n502670'"))
}
+ test("test cg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
override protected def afterAll(): Unit = {
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS normal_test")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 7e93959..00d13a9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
@@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
}
@@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
}
test("test write datamap 2 pages") {
+ sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"),
- classOf[C2DataMapFactory].getName,
- "test")
-
+ sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
val df = buildTestData(33000)
// save dataframe to carbon file
@@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
}
test("test write datamap 2 blocklet") {
- // register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"),
- classOf[C2DataMapFactory].getName,
- "test")
+ sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+ sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
CarbonProperties.getInstance()
.addProperty("carbon.blockletgroup.size.in.mb", "1")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 9c8cc15..90f0972 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
- var dataMapName: String = _
+ var dataMapSchema: DataMapSchema = _
/**
* Initialization of Datamap factory with the identifier and datamap name
*/
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
- this.dataMapName = dataMapName
+ this.dataMapSchema = dataMapSchema
}
/**
* Return a new write for this datamap
*/
override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
+ new FGDataMapWriter(identifier, segment, dataWritePath, dataMapSchema)
}
/**
@@ -136,7 +136,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
* Return metadata of this datamap
*/
override def getMeta: DataMapMeta = {
- new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
}
}
@@ -225,12 +226,16 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
}
- getEqualToExpression(f, buffer)
}
}
}
@@ -246,11 +251,12 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segment: Segment, dataWriterPath: String, dataMapName: String)
+ segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema)
extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
var currentBlockId: String = null
- val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+ val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+ ".datamap"
val stream: DataOutputStream = FileFactory
.getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
@@ -421,14 +427,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- table.getAbsoluteTableIdentifier,
- classOf[FGDataMapFactory].getName, "fgdatamap")
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
checkAnswer(sql("select * from datamap_test where name='n502670'"),
sql("select * from normal_test where name='n502670'"))
}
+ test("test fg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
override protected def afterAll(): Unit = {
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS normal_test")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 6ac9c7a..717af6f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
@@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
val newClass = "org.apache.spark.sql.CarbonSource"
- test("test datamap create: don't support using class, only support short name") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 1)
- assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
- assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
}
}
- test("test datamap create with dmproperties: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create with dmproperties: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
- assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
- assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
- assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
}
}
- test("test datamap create with existing name: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create with existing name: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(
s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
}
}
@@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
- }
- finally {
+ } finally {
sql("drop table hiveMetaStoreTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -136,6 +119,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
"datamap_hiveMetaStoreTable_1")
assert(sql("show datamap on table hiveMetaStoreTable_1").collect().length == 0)
sql("drop table hiveMetaStoreTable_1")
+
+ checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
}
finally {
CarbonProperties.getInstance()
@@ -145,17 +130,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}
test("test datamap create with preagg with duplicate name") {
- intercept[Exception] {
- sql(
- s"""
- | CREATE DATAMAP datamap2 ON TABLE datamaptest
- | USING 'preaggregate'
- | DMPROPERTIES('key'='value')
- | AS SELECT COUNT(a) FROM datamaptest
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | DMPROPERTIES('key'='value')
+ | AS SELECT COUNT(a) FROM datamaptest
""".stripMargin)
+ intercept[MalformedDataMapCommandException] {
sql(
s"""
- | CREATE DATAMAP datamap2 ON TABLE datamaptest
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
| USING 'preaggregate'
| DMPROPERTIES('key'='value')
| AS SELECT COUNT(a) FROM datamaptest
@@ -167,10 +152,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 2)
}
- test("test datamap drop with preagg") {
- intercept[Exception] {
- sql("drop table datamap3")
-
+ test("test drop non-exist datamap") {
+ intercept[NoSuchDataMapException] {
+ sql("drop datamap nonexist on table datamaptest")
}
val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
assert(table != null)
@@ -178,8 +162,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 2)
}
- test("test show datamap without preaggregate: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap without preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
@@ -188,8 +172,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}
}
- test("test show datamap with preaggregate: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap with preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
@@ -206,8 +190,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(sql("show datamap on table datamapshowtest").collect().length == 0)
}
- test("test show datamap after dropping datamap: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap after dropping datamap: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 60052f0..8d2f9ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
class SparkDataMapJob extends DataMapJob {
override def execute(dataMapFormat: DistributableDataMapFormat,
- resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
- new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+ filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
+ new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
.asJava
}
}
@@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
* RDD to prune the datamaps across spark cluster
* @param sc
* @param dataMapFormat
- * @param resolverIntf
*/
class DataMapPruneRDD(sc: SparkContext,
dataMapFormat: DistributableDataMapFormat,
@@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext,
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
- DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
reader.initialize(inputSplit, attemptContext)
val iter = new Iterator[ExtendedBlocklet] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index adf5e04..aab2897 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -101,8 +101,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
CarbonException.analysisException(s"table path already exists.")
case (SaveMode.Overwrite, true) =>
val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
- sqlContext.sparkSession
- .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
+ // In order to overwrite, delete all segments in the table
+ sqlContext.sparkSession.sql(
+ s"""
+ | DELETE FROM TABLE $dbName.${options.tableName}
+ | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
+ """.stripMargin)
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index d536746..1de66c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
import org.apache.carbondata.spark.util.CarbonScalaUtil
-class SparkUnknownExpression(var sparkExp: SparkExpression)
+class SparkUnknownExpression(
+ var sparkExp: SparkExpression,
+ expressionType: ExpressionType = ExpressionType.UNKNOWN)
extends UnknownExpression with ConditionalExpression {
private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
@@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
}
override def getFilterExpressionType: ExpressionType = {
- ExpressionType.UNKNOWN
+ expressionType
}
override def getString: String = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 6508777..c6d86b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,17 +16,23 @@
*/
package org.apache.spark.sql.execution.command.datamap
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
/**
* Below command class will be used to create datamap on table
@@ -52,61 +58,62 @@ case class CarbonCreateDataMapCommand(
if (carbonTable.isStreamingTable) {
throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
}
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = tableIdentifier.table + "_" + dataMapName
- val newDmProperties = if (dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).isDefined) {
- dmProperties.updated(TimeSeriesUtil.TIMESERIES_EVENTTIME,
- dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get.trim)
- } else {
- dmProperties
- }
- val dataMapProvider = {
- try {
- DataMapProvider.getDataMapProvider(dmClassName)
- } catch {
- case e: UnsupportedOperationException =>
- throw new MalformedDataMapCommandException(e.getMessage)
- }
- }
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tableName))) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
- s"Table [$tableName] already exists under database [$dbName]")
- tableIsExists = true
- if (!ifNotExistsSet) {
- throw new TableAlreadyExistsException(dbName, tableName)
- }
- } else {
- TimeSeriesUtil.validateTimeSeriesGranularity(newDmProperties, dmClassName)
- createPreAggregateTableCommands = if (dataMapProvider == TIMESERIES) {
- val details = TimeSeriesUtil
- .getTimeSeriesGranularityDetails(newDmProperties, dmClassName)
- val updatedDmProperties = newDmProperties - details._1
+ validateDataMapName(carbonTable)
+
+ if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+ dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+ TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmClassName)
+ createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+ val details = TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmClassName)
+ val updatedDmProperties = dmProperties - details._1
CreatePreAggregateTableCommand(
dataMapName,
tableIdentifier,
- dataMapProvider,
+ DataMapProvider.TIMESERIES,
updatedDmProperties,
queryString.get,
- Some(details._1),
- ifNotExistsSet = ifNotExistsSet)
+ Some(details._1))
} else {
CreatePreAggregateTableCommand(
dataMapName,
tableIdentifier,
- dataMapProvider,
- newDmProperties,
- queryString.get,
- ifNotExistsSet = ifNotExistsSet)
+ DataMapProvider.PREAGGREGATE,
+ dmProperties,
+ queryString.get
+ )
+ }
+ try {
+ createPreAggregateTableCommands.processMetadata(sparkSession)
+ } catch {
+ case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
+ s"'$dataMapName'", e)
}
- createPreAggregateTableCommands.processMetadata(sparkSession)
+ } else {
+ val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+ dataMapSchema.setProperties(new java.util.HashMap[String, String](dmProperties.asJava))
+ val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+ Some(dbName),
+ tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
+ // Save DataMapSchema in the schema file of main table
+ PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
}
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
Seq.empty
}
+ private def validateDataMapName(carbonTable: CarbonTable): Unit = {
+ val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
+ existingDataMaps.asScala.foreach { dataMapSchema =>
+ if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
+ throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+ }
+ }
+ }
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
@@ -116,7 +123,7 @@ case class CarbonCreateDataMapCommand(
Seq.empty
}
} else {
- throw new MalformedDataMapCommandException("Unknown datamap provider/class " + dmClassName)
+ Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index dcc71a2..e89d15a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -74,7 +75,7 @@ case class CarbonDropDataMapCommand(
Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
} catch {
case ex: NoSuchTableException =>
- throw ex
+ throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
}
// If datamap to be dropped in parent table then drop the datamap from metastore and remove
// entry from parent table.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c836584..c02ac4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
@@ -67,8 +68,10 @@ case class CreatePreAggregateTableCommand(
dmProperties.foreach(t => tableProperties.put(t._1, t._2))
parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
- assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
- "Parent table name is different in select and create")
+ if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
+ throw new MalformedDataMapCommandException(
+ "Parent table name is different in select and create")
+ }
var neworder = Seq[String]()
val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
parentOrder.foreach(parentcol =>
@@ -131,23 +134,19 @@ case class CreatePreAggregateTableCommand(
dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
// updating the parent table about child table
- try {
- PreAggregateUtil.updateMainTable(
- CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
- parentTableIdentifier.table,
- childSchema,
- sparkSession)
- } catch {
- case ex: Exception =>
- undoMetadata(sparkSession, ex)
- throw ex
- }
+ PreAggregateUtil.updateMainTable(
+ parentTable,
+ childSchema,
+ sparkSession)
+ // After updating the parent carbon table with data map entry extract the latest table object
+ // to be used in further create process.
+ parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
+ parentTableIdentifier.table)(sparkSession)
val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema,
parentTable.getTableName,
parentTable.getDatabaseName)
- }
- else {
+ } else {
queryString
}
val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index a5d256c..dd42b50 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.DataType
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -406,21 +407,20 @@ object PreAggregateUtil {
* Below method will be used to update the main table about the pre aggregate table information
* in case of any exception it will throw error so pre aggregate table creation will fail
*
- * @param dbName
- * @param tableName
* @param childSchema
* @param sparkSession
*/
- def updateMainTable(dbName: String, tableName: String,
+ def updateMainTable(carbonTable: CarbonTable,
childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
LockUsage.DROP_TABLE_LOCK)
var locks = List.empty[ICarbonLock]
- var carbonTable: CarbonTable = null
+ var numberOfCurrentChild: Int = 0
+ val dbName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getTableName
try {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
@@ -433,7 +433,7 @@ object PreAggregateUtil {
carbonTable.getTablePath)
if (wrapperTableInfo.getDataMapSchemaList.asScala.
exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
- throw new Exception("Duplicate datamap")
+ throw new MetadataProcessException("DataMap name already exist")
}
wrapperTableInfo.getDataMapSchemaList.add(childSchema)
val thriftTable = schemaConverter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index f6d5e25..e82c9d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -614,7 +614,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
- case StartsWith(a: Attribute, Literal(v, t)) =>
+ case s@StartsWith(a: Attribute, Literal(v, t)) =>
Some(sources.StringStartsWith(a.name, v.toString))
case c@EndsWith(a: Attribute, Literal(v, t)) =>
Some(CarbonEndsWith(c))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e5eb53c..38c5146 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -41,6 +41,10 @@ import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonCol
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -50,6 +54,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
*/
object CarbonFilters {
+ val carbonProperties = CarbonProperties.getInstance()
/**
* Converts data sources filters to carbon filter predicates.
*/
@@ -117,25 +122,20 @@ object CarbonFilters {
new OrExpression(lhsFilter, rhsFilter)
}
case sources.StringStartsWith(name, value) if value.length > 0 =>
- val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value))
- val maxValueLimit = value.substring(0, value.length - 1) +
- (value.charAt(value.length - 1).toInt + 1).toChar
- val r = new LessThanExpression(
- getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
- Some(new AndExpression(l, r))
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
case CarbonEndsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
- }))
+ }, ExpressionType.ENDSWITH))
case CarbonContainsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
- }))
+ }, ExpressionType.CONTAINSWITH))
case CastExpr(expr: Expression) =>
Some(transformExpression(expr))
case FalseExpr() =>
@@ -266,7 +266,7 @@ object CarbonFilters {
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
- case StartsWith(a: Attribute, Literal(v, t)) =>
+ case s@StartsWith(a: Attribute, Literal(v, t)) =>
Some(sources.StringStartsWith(a.name, v.toString))
case c@EndsWith(a: Attribute, Literal(v, t)) =>
Some(CarbonEndsWith(c))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 66f8bc5..f514074 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.processing.store.TablePage;
/**
@@ -50,9 +50,9 @@ public class DataMapWriterListener {
/**
* register all datamap writer for specified table and segment
*/
- public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
String dataWritePath) {
- List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+ List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
if (tableDataMaps != null) {
for (TableDataMap tableDataMap : tableDataMaps) {
DataMapFactory factory = tableDataMap.getDataMapFactory();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 0ee77da..21a4b3e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -263,7 +263,7 @@ public final class DataLoadProcessBuilder {
if (carbonTable.isHivePartitionTable()) {
configuration.setWritingCoresCount((short) 1);
}
- TableSpec tableSpec = new TableSpec(dimensions, measures);
+ TableSpec tableSpec = new TableSpec(carbonTable);
configuration.setTableSpec(tableSpec);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/56330ae2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d6af747..8aa5bde 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
DataMapWriterListener listener = new DataMapWriterListener();
- listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
- storeLocation[new Random().nextInt(storeLocation.length)]);
+ listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
+ configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -320,13 +320,11 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- carbonFactDataHandlerModel.tableSpec = new TableSpec(
- segmentProperties.getDimensions(),
- segmentProperties.getMeasures());
-
+ carbonFactDataHandlerModel.tableSpec =
+ new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable());
DataMapWriterListener listener = new DataMapWriterListener();
listener.registerAllWriter(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+ loadModel.getCarbonDataLoadSchema().getCarbonTable(),
loadModel.getSegmentId(),
tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
carbonFactDataHandlerModel.dataMapWriterlistener = listener;