You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/25 12:54:37 UTC
[03/10] carbondata git commit: [CARBONDATA-1543] Supported DataMap
chooser and expression for supporting multiple datamaps in single query
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 5eb274d..86fd240 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,18 +18,6 @@
package org.apache.carbondata.cluster.sdv.generated
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.common.util._
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
/**
* Test Class for AlterTableTestCase to verify all scenerios
*/
@@ -52,7 +40,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
- assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
+ assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("DROP TABLE IF EXISTS carbon_automation_merge")
@@ -75,8 +63,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
- assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
- assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
+ assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") <= 2)
+ assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") <= 2)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index b3bf93d..0d33797 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -272,7 +272,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
test("test pre agg create table 22: using invalid datamap provider") {
sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
- val e: Exception = intercept[Exception] {
+ val e = intercept[MalformedDataMapCommandException] {
sql(
"""
| CREATE DATAMAP agg0 ON TABLE mainTable
@@ -282,8 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
| GROUP BY column3,column5,column2
""".stripMargin)
}
- assert(e.getMessage.contains(
- s"Unknown data map type abc"))
+ assert(e.getMessage.contains("DataMap class 'abc' not found"))
sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 97aa056..49cabea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("Unknown data map type abc"))
+ assert(e.getMessage.equals("DataMap class 'abc' not found"))
}
test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
@@ -215,10 +215,11 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
- assert(e.getMessage.equals("Unknown data map type abc"))
+ assert(e.getMessage.equals("DataMap class 'abc' not found"))
}
test("test timeseries create table: Only one granularity level can be defined 1") {
+ sql("drop datamap if exists agg0_second on table mainTable")
val e: Exception = intercept[MalformedCarbonCommandException] {
sql(
s"""
@@ -235,6 +236,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
| GROUP BY dataTime
""".stripMargin)
}
+ e.printStackTrace()
assert(e.getMessage.equals("Only one granularity level can be defined"))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
index c522c1e..e31896f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -113,7 +113,7 @@ object CompactionSupportGlobalSortBigFileTest {
try {
val write = new PrintWriter(fileName);
for (i <- start until (start + line)) {
- write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+ write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + Random.nextInt(80))
}
write.close()
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 6f03493..47ef192 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -86,18 +86,18 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
buildTestData
}
-test("test the boolean data type"){
- booldf.write
- .format("carbondata")
- .option("tableName", "carbon10")
- .option("tempCSV", "true")
- .option("compress", "true")
- .mode(SaveMode.Overwrite)
- .save()
- checkAnswer(
- sql("SELECT * FROM CARBON10"),
- Seq(Row("anubhav", true), Row("prince", false)))
-}
+ test("test the boolean data type"){
+ booldf.write
+ .format("carbondata")
+ .option("tableName", "carbon0")
+ .option("tempCSV", "true")
+ .option("compress", "true")
+ .mode(SaveMode.Overwrite)
+ .save()
+ checkAnswer(
+ sql("SELECT * FROM CARBON0"),
+ Seq(Row("anubhav", true), Row("prince", false)))
+ }
test("test load dataframe with saving compressed csv files") {
// save dataframe to carbon file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..d4c49d2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,30 +49,28 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
- var dataMapName: String = _
+ var dataMapSchema: DataMapSchema = _
/**
* Initialization of Datamap factory with the identifier and datamap name
*/
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
- this.dataMapName = dataMapName
+ this.dataMapSchema = dataMapSchema
}
/**
* Return a new write for this datamap
*/
override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
}
/**
* Get the datamap for segmentid
*/
- override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
- val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ override def getDataMaps(segmentId: String) = {
+ val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +107,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
*
* @return
*/
- override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
- val file = FileFactory.getCarbonFile(
- CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+ override def toDistributable(segmentId: String) = {
+ val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
val files = file.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -140,7 +138,8 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
* Return metadata of this datamap
*/
override def getMeta: DataMapMeta = {
- new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
}
}
@@ -198,12 +197,16 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
}
private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
}
- getEqualToExpression(f, buffer)
}
}
}
@@ -221,12 +224,12 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
segmentId: String,
dataWritePath: String,
- dataMapName: String)
+ dataMapSchema: DataMapSchema)
extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
var currentBlockId: String = null
val cgwritepath = dataWritePath + "/" +
- dataMapName + System.nanoTime() + ".datamap"
+ dataMapSchema.getDataMapName + System.nanoTime() + ".datamap"
lazy val stream: DataOutputStream = FileFactory
.getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath))
val blockletList = new ArrayBuffer[Array[Byte]]()
@@ -345,14 +348,29 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- table.getAbsoluteTableIdentifier,
- classOf[CGDataMapFactory].getName, "cgdatamap")
+ sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')")
checkAnswer(sql("select * from datamap_test_cg where name='n502670'"),
sql("select * from normal_test where name='n502670'"))
}
+ test("test cg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')")
+ sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
override protected def afterAll(): Unit = {
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS normal_test")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index f694a6b..903610a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Da
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.CarbonProperties
@@ -41,7 +42,7 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
}
@@ -89,12 +90,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
}
test("test write datamap 2 pages") {
+ sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation + "/carbon1", "default", "carbon1"),
- classOf[C2DataMapFactory].getName,
- "test")
-
+ sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'")
val df = buildTestData(33000)
// save dataframe to carbon file
@@ -119,11 +117,8 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
}
test("test write datamap 2 blocklet") {
- // register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation + "/carbon2", "default", "carbon2"),
- classOf[C2DataMapFactory].getName,
- "test")
+ sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
+ sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'")
CarbonProperties.getInstance()
.addProperty("carbon.blockletgroup.size.in.mb", "1")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..8031dc2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.indexstore.FineGrainBlocklet
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
@@ -48,22 +49,21 @@ import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlo
class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
var identifier: AbsoluteTableIdentifier = _
- var dataMapName: String = _
+ var dataMapSchema: DataMapSchema = _
/**
* Initialization of Datamap factory with the identifier and datamap name
*/
- override def init(identifier: AbsoluteTableIdentifier,
- dataMapName: String): Unit = {
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = {
this.identifier = identifier
- this.dataMapName = dataMapName
+ this.dataMapSchema = dataMapSchema
}
/**
* Return a new write for this datamap
*/
override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
- new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+ new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema)
}
/**
@@ -137,7 +137,8 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
* Return metadata of this datamap
*/
override def getMeta: DataMapMeta = {
- new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava)
+ new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
+ List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
}
}
@@ -228,12 +229,16 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
- if (expression.getChildren != null) {
- expression.getChildren.asScala.map { f =>
- if (f.isInstanceOf[EqualToExpression]) {
- buffer += f
+ if (expression.isInstanceOf[EqualToExpression]) {
+ buffer += expression
+ } else {
+ if (expression.getChildren != null) {
+ expression.getChildren.asScala.map { f =>
+ if (f.isInstanceOf[EqualToExpression]) {
+ buffer += f
+ }
+ getEqualToExpression(f, buffer)
}
- getEqualToExpression(f, buffer)
}
}
}
@@ -249,11 +254,12 @@ class FGDataMap extends AbstractFineGrainDataMap {
}
class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
- segmentId: String, dataWriterPath: String, dataMapName: String)
+ segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema)
extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
var currentBlockId: String = null
- val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"
+ val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() +
+ ".datamap"
val stream: DataOutputStream = FileFactory
.getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath))
val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]()
@@ -424,14 +430,44 @@ class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
// register datamap writer
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- table.getAbsoluteTableIdentifier,
- classOf[FGDataMapFactory].getName, "fgdatamap")
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
checkAnswer(sql("select * from datamap_test where name='n502670'"),
sql("select * from normal_test where name='n502670'"))
}
+ test("test fg datamap with 2 datamaps ") {
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test")
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='name')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test
+ | USING '${classOf[FGDataMapFactory].getName}'
+ | DMPROPERTIES('indexcolumns'='city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"),
+ sql("select * from normal_test where name='n502670' and city='c2670'"))
+ }
+
override protected def afterAll(): Unit = {
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS normal_test")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 37007ed..b2ab977 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
@@ -42,39 +43,22 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
val newClass = "org.apache.spark.sql.CarbonSource"
- test("test datamap create: don't support using class, only support short name") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 1)
- assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
- assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
}
}
- test("test datamap create with dmproperties: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create with dmproperties: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
- assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
- assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
- assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
}
}
- test("test datamap create with existing name: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test datamap create with existing name: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql(
s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
}
}
@@ -106,8 +90,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
- }
- finally {
+ } finally {
sql("drop table hiveMetaStoreTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -133,8 +116,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
sql("drop table hiveMetaStoreTable_1")
checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
- }
- finally {
+ } finally {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
@@ -142,17 +124,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}
test("test datamap create with preagg with duplicate name") {
- intercept[Exception] {
- sql(
- s"""
- | CREATE DATAMAP datamap2 ON TABLE datamaptest
- | USING 'preaggregate'
- | DMPROPERTIES('key'='value')
- | AS SELECT COUNT(a) FROM datamaptest
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | DMPROPERTIES('key'='value')
+ | AS SELECT COUNT(a) FROM datamaptest
""".stripMargin)
+ intercept[MalformedDataMapCommandException] {
sql(
s"""
- | CREATE DATAMAP datamap2 ON TABLE datamaptest
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
| USING 'preaggregate'
| DMPROPERTIES('key'='value')
| AS SELECT COUNT(a) FROM datamaptest
@@ -164,10 +146,9 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 2)
}
- test("test datamap drop with preagg") {
- intercept[Exception] {
- sql("drop table datamap3")
-
+ test("test drop non-exist datamap") {
+ intercept[NoSuchDataMapException] {
+ sql("drop datamap nonexist on table datamaptest")
}
val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
assert(table != null)
@@ -175,8 +156,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 2)
}
- test("test show datamap without preaggregate: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap without preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
@@ -185,8 +166,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}
}
- test("test show datamap with preaggregate: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap with preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
@@ -203,8 +184,8 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(sql("show datamap on table datamapshowtest").collect().length == 0)
}
- test("test show datamap after dropping datamap: don't support using class") {
- intercept[MalformedDataMapCommandException] {
+ test("test show datamap after dropping datamap: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
sql("drop table if exists datamapshowtest")
sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index c200b1b..84b59e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
@@ -46,10 +47,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
buildTestData()
// register hook to the table to sleep, thus the other command will be executed
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
- classOf[WaitingDataMap].getName,
- "test")
+ sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1")
}
private def buildTestData(): Unit = {
@@ -166,7 +164,7 @@ object Global {
class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
- override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+ override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { }
override def fireEvent(event: Event): Unit = ???
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 60052f0..8d2f9ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -37,8 +37,8 @@ import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
class SparkDataMapJob extends DataMapJob {
override def execute(dataMapFormat: DistributableDataMapFormat,
- resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {
- new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+ filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
+ new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
.asJava
}
}
@@ -53,7 +53,6 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
* RDD to prune the datamaps across spark cluster
* @param sc
* @param dataMapFormat
- * @param resolverIntf
*/
class DataMapPruneRDD(sc: SparkContext,
dataMapFormat: DistributableDataMapFormat,
@@ -70,7 +69,6 @@ class DataMapPruneRDD(sc: SparkContext,
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
- DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
reader.initialize(inputSplit, attemptContext)
val iter = new Iterator[ExtendedBlocklet] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 5dcca6d..ec26c34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -95,8 +95,12 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
CarbonException.analysisException(s"table path already exists.")
case (SaveMode.Overwrite, true) =>
val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
- sqlContext.sparkSession
- .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
+ // In order to overwrite, delete all segments in the table
+ sqlContext.sparkSession.sql(
+ s"""
+ | DELETE FROM TABLE $dbName.${options.tableName}
+ | WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 01:00:00'
+ """.stripMargin)
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index d536746..1de66c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -32,7 +32,9 @@ import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
import org.apache.carbondata.spark.util.CarbonScalaUtil
-class SparkUnknownExpression(var sparkExp: SparkExpression)
+class SparkUnknownExpression(
+ var sparkExp: SparkExpression,
+ expressionType: ExpressionType = ExpressionType.UNKNOWN)
extends UnknownExpression with ConditionalExpression {
private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
@@ -64,7 +66,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
}
override def getFilterExpressionType: ExpressionType = {
- ExpressionType.UNKNOWN
+ expressionType
}
override def getString: String = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index e93ab25..3d3f83b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,15 +16,21 @@
*/
package org.apache.spark.sql.execution.command.datamap
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
/**
* Below command class will be used to create datamap on table
@@ -48,7 +54,7 @@ case class CarbonCreateDataMapCommand(
if (carbonTable.isStreamingTable) {
throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
}
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ validateDataMapName(carbonTable)
if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
@@ -72,20 +78,44 @@ case class CarbonCreateDataMapCommand(
queryString.get
)
}
- createPreAggregateTableCommands.processMetadata(sparkSession)
+ try {
+ createPreAggregateTableCommands.processMetadata(sparkSession)
+ } catch {
+ case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
+ s"'$dataMapName'", e)
+ }
} else {
- throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+ val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+ dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+ val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+ Some(dbName),
+ tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+ DataMapStoreManager.getInstance().createAndRegisterDataMap(
+ carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
+ // Save DataMapSchema in the schema file of main table
+ PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
}
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
Seq.empty
}
+ private def validateDataMapName(carbonTable: CarbonTable) = {
+ val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
+ existingDataMaps.asScala.foreach { dataMapSchema =>
+ if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
+ throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")
+ }
+ }
+ }
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
createPreAggregateTableCommands.processData(sparkSession)
} else {
- throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
+ Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index e5db286..f410f52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -72,7 +73,7 @@ case class CarbonDropDataMapCommand(
Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
} catch {
case ex: NoSuchTableException =>
- throw ex
+ throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
}
if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index bf72325..6d4822b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -64,8 +65,10 @@ case class CreatePreAggregateTableCommand(
dmProperties.foreach(t => tableProperties.put(t._1, t._2))
parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
- assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
- "Parent table name is different in select and create")
+ if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
+ throw new MalformedDataMapCommandException(
+ "Parent table name is different in select and create")
+ }
var neworder = Seq[String]()
val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
parentOrder.foreach(parentcol =>
@@ -131,8 +134,7 @@ case class CreatePreAggregateTableCommand(
// updating the parent table about child table
PreAggregateUtil.updateMainTable(
- CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
- parentTableIdentifier.table,
+ parentTable,
childSchema,
sparkSession)
// After updating the parent carbon table with data map entry extract the latest table object
@@ -147,8 +149,7 @@ case class CreatePreAggregateTableCommand(
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
parentTable.getTableName,
parentTable.getDatabaseName)
- }
- else {
+ } else {
queryString
}
val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 7e3b80e..1073f63 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.DataType
+import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -404,22 +405,20 @@ object PreAggregateUtil {
* Below method will be used to update the main table about the pre aggregate table information
* in case of any exception it will throw error so pre aggregate table creation will fail
*
- * @param dbName
- * @param tableName
* @param childSchema
* @param sparkSession
*/
- def updateMainTable(dbName: String, tableName: String,
+ def updateMainTable(carbonTable: CarbonTable,
childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
LockUsage.DROP_TABLE_LOCK)
var locks = List.empty[ICarbonLock]
- var carbonTable: CarbonTable = null
var numberOfCurrentChild: Int = 0
+ val dbName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getTableName
try {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
@@ -433,7 +432,7 @@ object PreAggregateUtil {
numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
if (wrapperTableInfo.getDataMapSchemaList.asScala.
exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
- throw new Exception("Duplicate datamap")
+ throw new MetadataProcessException("DataMap name already exist")
}
wrapperTableInfo.getDataMapSchemaList.add(childSchema)
val thriftTable = schemaConverter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index f38304e..79ff15e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 46e24dd..4b2bfa9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -610,7 +610,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
- case StartsWith(a: Attribute, Literal(v, t)) =>
+ case s@StartsWith(a: Attribute, Literal(v, t)) =>
Some(sources.StringStartsWith(a.name, v.toString))
case c@EndsWith(a: Attribute, Literal(v, t)) =>
Some(CarbonEndsWith(c))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 4d91375..c062cfb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -36,8 +36,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -47,6 +49,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
*/
object CarbonFilters {
+ val carbonProperties = CarbonProperties.getInstance()
/**
* Converts data sources filters to carbon filter predicates.
*/
@@ -114,25 +117,20 @@ object CarbonFilters {
new OrExpression(lhsFilter, rhsFilter)
}
case sources.StringStartsWith(name, value) if value.length > 0 =>
- val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value))
- val maxValueLimit = value.substring(0, value.length - 1) +
- (value.charAt(value.length - 1).toInt + 1).toChar
- val r = new LessThanExpression(
- getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
- Some(new AndExpression(l, r))
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
case CarbonEndsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
- }))
+ }, ExpressionType.ENDSWITH))
case CarbonContainsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
- }))
+ }, ExpressionType.CONTAINSWITH))
case CastExpr(expr: Expression) =>
Some(transformExpression(expr))
case _ => None
@@ -261,7 +259,7 @@ object CarbonFilters {
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
- case StartsWith(a: Attribute, Literal(v, t)) =>
+ case s@StartsWith(a: Attribute, Literal(v, t)) =>
Some(sources.StringStartsWith(a.name, v.toString))
case c@EndsWith(a: Attribute, Literal(v, t)) =>
Some(CarbonEndsWith(c))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 31a6701..e817590 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.processing.store.TablePage;
/**
@@ -49,9 +49,9 @@ public class DataMapWriterListener {
/**
* register all datamap writer for specified table and segment
*/
- public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId,
+ public void registerAllWriter(CarbonTable carbonTable, String segmentId,
String dataWritePath) {
- List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier);
+ List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
if (tableDataMaps != null) {
for (TableDataMap tableDataMap : tableDataMaps) {
DataMapFactory factory = tableDataMap.getDataMapFactory();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index b8e9062..4410e2a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -233,7 +233,7 @@ public final class DataLoadProcessBuilder {
if (carbonTable.isHivePartitionTable()) {
configuration.setWritingCoresCount((short) 1);
}
- TableSpec tableSpec = new TableSpec(dimensions, measures);
+ TableSpec tableSpec = new TableSpec(carbonTable);
configuration.setTableSpec(tableSpec);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5733413e/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0e9cbc5..dd59ed3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -260,8 +260,8 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
DataMapWriterListener listener = new DataMapWriterListener();
- listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(),
- storeLocation[new Random().nextInt(storeLocation.length)]);
+ listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(),
+ configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]);
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -321,13 +321,11 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- carbonFactDataHandlerModel.tableSpec = new TableSpec(
- segmentProperties.getDimensions(),
- segmentProperties.getMeasures());
-
+ carbonFactDataHandlerModel.tableSpec =
+ new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable());
DataMapWriterListener listener = new DataMapWriterListener();
listener.registerAllWriter(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
+ loadModel.getCarbonDataLoadSchema().getCarbonTable(),
loadModel.getSegmentId(),
tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]);
carbonFactDataHandlerModel.dataMapWriterlistener = listener;