You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:48:08 UTC
[1/3] carbondata git commit: [CARBONDATA-1739] Clean up store path
interface
Repository: carbondata
Updated Branches:
refs/heads/master b6777fcc3 -> 5fc7f06f2
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 153b169..07491d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
sys.error(s"Table $dbName.$tableName does not exist")
}
// acquire the lock first
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
@@ -133,7 +133,7 @@ object AlterTableUtil {
thriftTable: TableInfo)(sparkSession: SparkSession,
sessionState: CarbonSessionState): Unit = {
val dbName = carbonTable.getDatabaseName
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
@@ -232,10 +232,7 @@ object AlterTableUtil {
def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
-
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -262,9 +259,7 @@ object AlterTableUtil {
def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -297,9 +292,7 @@ object AlterTableUtil {
def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -343,30 +336,27 @@ object AlterTableUtil {
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
var timeStamp = 0L
- var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
var carbonTable: CarbonTable = null
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
// get the latest carbon table
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getTablePath)
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getTablePath)
val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(timeStamp)
- val thriftTable = schemaConverter
- .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+ wrapperTableInfo, dbName, tableName)
val tblPropertiesMap: mutable.Map[String, String] =
thriftTable.fact_table.getTableProperties.asScala
if (set) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index dcfbaea..c05c0f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,6 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -40,9 +39,7 @@ object CleanFiles {
def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
storePath: String, forceTableClean: Boolean = false): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 8375762..d682b21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,6 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
- import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -30,9 +29,7 @@ object DeleteSegmentByDate {
def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
dateValue: String): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 9b87504..5b58c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -34,9 +34,7 @@ object DeleteSegmentById {
def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
segmentIds: Seq[String]): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 23cba20..287191c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -245,7 +245,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table add partition: List Partition") {
sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -286,7 +286,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
val partitionIds2 = partitionInfo2.getPartitionIds
val list_info2 = partitionInfo2.getListInfo
assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -304,7 +304,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table add partition: Range Partition") {
sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -342,7 +342,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val range_info1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -373,7 +373,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: List Partition") {
sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava)
@@ -415,7 +415,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val list_info1 = partitionInfo1.getListInfo
assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
@@ -438,7 +438,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin)
sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
@@ -485,7 +485,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin)
sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 3, 4, 5, 7, 8, 9).map(Integer.valueOf(_)).asJava)
@@ -528,7 +528,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: Range Partition") {
sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val rangeInfo = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -566,7 +566,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val rangeInfo1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -586,7 +586,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: Range Partition + Bucket") {
sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val rangeInfo = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -624,7 +624,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val rangeInfo1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -642,7 +642,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
val partitionIds2 = partitionInfo2.getPartitionIds
val rangeInfo2 = partitionInfo2.getRangeInfo
assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
@@ -659,7 +659,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName)
val partitionIds3 = partitionInfo3.getPartitionIds
val rangeInfo3 = partitionInfo3.getRangeInfo
assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
@@ -789,7 +789,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
@@ -809,7 +809,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val range_info1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index a3024be..3f5d8c6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -43,12 +43,12 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
header: String,
allDictFilePath: String): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
- val table = relation.tableMeta.carbonTable
+ carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+ val table = relation.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
@@ -141,10 +141,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
test("Support generate global dictionary from all dictionary files") {
val header = "id,name,city,age"
val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, header, sampleAllDictionaryFile)
- GlobalDictionaryUtil
- .generateGlobalDictionary(sqlContext,
- carbonLoadModel,
- sampleRelation.tableMeta.storePath)
+ GlobalDictionaryUtil.generateGlobalDictionary(
+ sqlContext, carbonLoadModel, sampleRelation.carbonTable.getTablePath)
DictionaryTestCaseUtil.
checkDictionary(sampleRelation, "city", "shenzhen")
@@ -156,7 +154,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext,
carbonLoadModel,
- complexRelation.tableMeta.storePath)
+ complexRelation.carbonTable.getTablePath)
DictionaryTestCaseUtil.
checkDictionary(complexRelation, "channelsId", "1650")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 930de43..4551120 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -38,9 +38,9 @@ object DictionaryTestCaseUtil {
* @param value a value of column
*/
def checkDictionary(relation: CarbonRelation, columnName: String, value: String) {
- val table = relation.tableMeta.carbonTable
- val dimension = table.getDimensionByName(table.getFactTableName, columnName)
- val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
+ val table = relation.carbonTable
+ val dimension = table.getDimensionByName(table.getTableName, columnName)
+ val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
val absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
dimension.getColumnIdentifier, dimension.getDataType,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d37a68b..78ae384 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -151,12 +151,12 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
extColFilePath: String,
csvDelimiter: String = ","): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
- val table = relation.tableMeta.carbonTable
+ carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+ val table = relation.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
@@ -198,7 +198,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath1)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
@@ -207,7 +207,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath2)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check the old dictionary and whether the new distinct value is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
@@ -220,7 +220,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath3)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "channelsId", "1421|")
@@ -229,7 +229,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
header2, extColDictFilePath3, "|")
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- verticalDelimiteRelation.tableMeta.storePath)
+ verticalDelimiteRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
verticalDelimiteRelation, "channelsId", "1431,")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 442d93e..71c3dc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -182,9 +182,9 @@ public final class DataLoadProcessBuilder {
loadModel.getBadRecordsLocation());
CarbonMetadata.getInstance().addCarbonTable(carbonTable);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
Map<String, String> dateFormatMap =
CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
List<DataField> dataFields = new ArrayList<>();
@@ -209,7 +209,7 @@ public final class DataLoadProcessBuilder {
}
}
configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
- configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+ configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
// configuration for one pass load: dictionary server info
configuration.setUseOnePass(loadModel.getUseOnePass());
configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index be3572c..65f70a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -209,7 +209,7 @@ public class CarbonCompactionExecutor {
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dim : dimensions) {
// check if dimension is deleted
QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -220,7 +220,7 @@ public class CarbonCompactionExecutor {
List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure carbonMeasure : measures) {
// check if measure is deleted
QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 08b8600..c60bb24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -308,7 +308,7 @@ public class CarbonCompactionUtil {
public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
List<CarbonDimension> masterDimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
for (CarbonDimension dimension : masterDimensions) {
Integer value = columnCardinalityMap.get(dimension.getColumnId());
@@ -321,7 +321,7 @@ public class CarbonCompactionUtil {
}
// add measures to the column schema list
List<CarbonMeasure> masterSchemaMeasures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure measure : masterSchemaMeasures) {
updatedColumnSchemaList.add(measure.getColumnSchema());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c1df349..8f6d19c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1264,7 +1264,7 @@ public final class CarbonDataMergerUtil {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -1284,18 +1284,18 @@ public final class CarbonDataMergerUtil {
}
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
deleted file mode 100644
index 09dbfff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
- private static final long serialVersionUID = -1749874611119829431L;
-
- public CarbonTableIdentifier carbonTableIdentifier;
- public String storePath;
- public CarbonTable carbonTable;
- public String tablePath;
-
- public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
- CarbonTable carbonTable) {
- this.carbonTableIdentifier = carbonTableIdentifier;
- this.storePath = storePath;
- this.tablePath = tablePath;
- this.carbonTable = carbonTable;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index aeddac6..36e022b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -78,7 +78,7 @@ public abstract class AbstractCarbonQueryExecutor {
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dim : dimensions) {
// check if dimension is deleted
QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -89,7 +89,7 @@ public abstract class AbstractCarbonQueryExecutor {
List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure carbonMeasure : measures) {
// check if measure is deleted
QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 1db414f..48c5471 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -46,7 +46,7 @@ public class RowResultProcessor {
SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
- String tableName = carbonTable.getFactTableName();
+ String tableName = carbonTable.getTableName();
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 504e7ec..75fcea3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.processing.store;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -42,19 +41,15 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
-import org.apache.carbondata.processing.store.file.FileManager;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
@@ -77,10 +72,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private CarbonFactDataWriter dataWriter;
/**
- * File manager
- */
- private IFileManagerComposite fileManager;
- /**
* total number of entries in blocklet
*/
private int entryCount;
@@ -91,11 +82,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private int pageSize;
- // This variable is true if it is dictionary dimension and its cardinality is lower than
- // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
- // It decides whether it will do RLE encoding on data page for this dimension
- private boolean[] rleEncodingForDictDimension;
- private boolean[] isNoDictionary;
private long processedDataCount;
private ExecutorService producerExecutorService;
private List<Future<Void>> producerExecutorServiceTaskList;
@@ -130,12 +116,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private boolean processingComplete;
/**
- * boolean to check whether dimension
- * is of dictionary type or no dictionary type
- */
- private boolean[] isDictDimension;
-
- /**
* current data format version
*/
private ColumnarFormatVersion version;
@@ -146,47 +126,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
this.model = model;
initParameters(model);
-
- int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
- + getExpandedComplexColsCount();
- this.rleEncodingForDictDimension = new boolean[numDimColumns];
- this.isNoDictionary = new boolean[numDimColumns];
-
- int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
- // setting true value for dims of high card
- for (int i = 0; i < model.getNoDictionaryCount(); i++) {
- this.isNoDictionary[noDictStartIndex + i] = true;
- }
-
- boolean isAggKeyBlock = Boolean.parseBoolean(
- CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
- CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
- if (isAggKeyBlock) {
- int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
- for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) {
- if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) {
- this.rleEncodingForDictDimension[i] = true;
- }
- }
-
- if (model.getDimensionCount() < dimLens.length) {
- int allColsCount = getColsCount(model.getDimensionCount());
- List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
- for (int i = 0; i < model.getDimensionCount(); i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i);
- if (complexDataType != null) {
- complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
- } else {
- rleWithComplex.add(this.rleEncodingForDictDimension[i]);
- }
- }
- this.rleEncodingForDictDimension = new boolean[allColsCount];
- for (int i = 0; i < allColsCount; i++) {
- this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
- }
- }
- }
this.version = CarbonProperties.getInstance().getFormatVersion();
StringBuffer noInvertedIdxCol = new StringBuffer();
for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
@@ -202,13 +141,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
SortScopeOptions.SortScope sortScope = model.getSortScope();
this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
- //TODO need to pass carbon table identifier to metadata
- CarbonTable carbonTable =
- CarbonMetadata.getInstance().getCarbonTable(
- model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
- isDictDimension =
- CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
-
// in compaction flow the measure with decimal type will come as spark decimal.
// need to convert it to byte array.
if (model.isCompactionFlow()) {
@@ -247,19 +179,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
}
- private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
- int counter = 0;
- boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
- for (int i = 0; i < isDictDimension.length; i++) {
- if (isDictDimension[i]) {
- uniqueBlock[i] = aggKeyBlock[counter++];
- } else {
- uniqueBlock[i] = false;
- }
- }
- return uniqueBlock;
- }
-
private void setComplexMapSurrogateIndex(int dimensionCount) {
int surrIndex = 0;
for (int i = 0; i < dimensionCount; i++) {
@@ -283,9 +202,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @throws CarbonDataWriterException
*/
public void initialise() throws CarbonDataWriterException {
- fileManager = new FileManager();
- // todo: the fileManager seems to be useless, remove it later
- fileManager.setName(new File(model.getStoreLocation()[0]).getName());
setWritingConfiguration();
}
@@ -412,27 +328,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
- private int getColsCount(int columnSplit) {
- int count = 0;
- for (int i = 0; i < columnSplit; i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i);
- if (complexDataType != null) {
- count += complexDataType.getColsCount();
- } else count++;
- }
- return count;
- }
-
// return the number of complex column after complex columns are expanded
private int getExpandedComplexColsCount() {
return model.getExpandedComplexColsCount();
}
- // return the number of complex column
- private int getComplexColumnCount() {
- return model.getComplexIndexMap().size();
- }
-
/**
* below method will be used to close the handler
*/
@@ -519,7 +419,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.dataWriter = getFactDataWriter();
// initialize the channel;
this.dataWriter.initializeWriter();
- //initializeColGrpMinMax();
}
/**
@@ -571,14 +470,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
carbonDataWriterVo.setTableName(model.getTableName());
- carbonDataWriterVo.setFileManager(fileManager);
- carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
- carbonDataWriterVo.setIsComplexType(isComplexTypes());
carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
- carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
carbonDataWriterVo.setColCardinality(model.getColCardinality());
carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
@@ -590,31 +485,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return carbonDataWriterVo;
}
- private boolean[] isComplexTypes() {
- int noDictionaryCount = model.getNoDictionaryCount();
- int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
- int allColsCount = getColsCount(noOfColumn);
- boolean[] isComplexType = new boolean[allColsCount];
-
- List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
- for (int i = 0; i < noOfColumn; i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
- if (complexDataType != null) {
- int count = complexDataType.getColsCount();
- for (int j = 0; j < count; j++) {
- complexTypesList.add(true);
- }
- } else {
- complexTypesList.add(false);
- }
- }
- for (int i = 0; i < allColsCount; i++) {
- isComplexType[i] = complexTypesList.get(i);
- }
-
- return isComplexType;
- }
-
/**
* This method will reset the block processing count
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
deleted file mode 100644
index ddd9bf2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-
-public class FileData extends FileManager {
-
- /**
- * Store Path
- */
- private String storePath;
-
- /**
- * hierarchyValueWriter
- */
-
- public FileData(String fileName, String storePath) {
- this.fileName = fileName;
- this.storePath = storePath;
- }
-
- /**
- * @return Returns the carbonDataFileTempPath.
- */
- public String getFileName() {
- return fileName;
- }
-
- /**
- * @return Returns the storePath.
- */
- public String getStorePath() {
- return storePath;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
deleted file mode 100644
index cfa3a66..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class FileManager implements IFileManagerComposite {
- /**
- * listOfFileData, composite parent which holds the different objects
- */
- protected List<IFileManagerComposite> listOfFileData =
- new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- protected String fileName;
-
- @Override public void add(IFileManagerComposite customData) {
- listOfFileData.add(customData);
- }
-
- @Override public void remove(IFileManagerComposite customData) {
- listOfFileData.remove(customData);
-
- }
-
- @Override public IFileManagerComposite get(int i) {
- return listOfFileData.get(i);
- }
-
- @Override public void setName(String name) {
- this.fileName = name;
- }
-
- /**
- * Return the size
- */
- public int size() {
- return listOfFileData.size();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
deleted file mode 100644
index 6691772..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-public interface IFileManagerComposite {
- /**
- * Add the data which can be either row Folder(Composite) or File
- *
- * @param customData
- */
- void add(IFileManagerComposite customData);
-
- /**
- * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
- *
- * @param customData
- */
- void remove(IFileManagerComposite customData);
-
- /**
- * get the CustomData type object name
- *
- * @return CustomDataIntf type
- */
- IFileManagerComposite get(int i);
-
- /**
- * set the CustomData type object name
- *
- * @param name
- */
- void setName(String name);
-
- /**
- * Get the size
- *
- * @return
- */
- int size();
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 1b6ba72..855ec03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
-import org.apache.carbondata.processing.store.file.FileData;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IOUtils;
@@ -317,9 +316,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
.getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
"" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
- String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
- FileData fileData = new FileData(actualFileNameVal, chosenTempLocation);
- dataWriterVo.getFileManager().add(fileData);
this.carbonDataFileTempPath = chosenTempLocation + File.separator
+ carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
this.fileCount++;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 26fff09..79cdd95 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
/**
* Value object for writing the data
@@ -35,12 +34,6 @@ public class CarbonDataWriterVo {
private String tableName;
- private IFileManagerComposite fileManager;
-
- private boolean[] rleEncodingForDictDim;
-
- private boolean[] isComplexType;
-
private int NoDictionaryCount;
private CarbonDataFileAttributes carbonDataFileAttributes;
@@ -49,8 +42,6 @@ public class CarbonDataWriterVo {
private List<ColumnSchema> wrapperColumnSchemaList;
- private boolean[] isDictionaryColumn;
-
private String carbonDataDirectoryPath;
private int[] colCardinality;
@@ -110,48 +101,6 @@ public class CarbonDataWriterVo {
}
/**
- * @return the fileManager
- */
- public IFileManagerComposite getFileManager() {
- return fileManager;
- }
-
- /**
- * @param fileManager the fileManager to set
- */
- public void setFileManager(IFileManagerComposite fileManager) {
- this.fileManager = fileManager;
- }
-
- /**
- * @return the rleEncodingForDictDim
- */
- public boolean[] getRleEncodingForDictDim() {
- return rleEncodingForDictDim;
- }
-
- /**
- * @param rleEncodingForDictDim the rleEncodingForDictDim to set
- */
- public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
- this.rleEncodingForDictDim = rleEncodingForDictDim;
- }
-
- /**
- * @return the isComplexType
- */
- public boolean[] getIsComplexType() {
- return isComplexType;
- }
-
- /**
- * @param isComplexType the isComplexType to set
- */
- public void setIsComplexType(boolean[] isComplexType) {
- this.isComplexType = isComplexType;
- }
-
- /**
* @return the noDictionaryCount
*/
public int getNoDictionaryCount() {
@@ -208,20 +157,6 @@ public class CarbonDataWriterVo {
}
/**
- * @return the isDictionaryColumn
- */
- public boolean[] getIsDictionaryColumn() {
- return isDictionaryColumn;
- }
-
- /**
- * @param isDictionaryColumn the isDictionaryColumn to set
- */
- public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
- this.isDictionaryColumn = isDictionaryColumn;
- }
-
- /**
* @return the carbonDataDirectoryPath
*/
public String getCarbonDataDirectoryPath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ca40830..7218a12 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -383,7 +383,7 @@ public final class CarbonDataProcessorUtil {
*/
public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- String factTableName = schema.getCarbonTable().getFactTableName();
+ String factTableName = schema.getCarbonTable().getTableName();
if (tableName.equals(factTableName)) {
List<CarbonDimension> dimensions =
schema.getCarbonTable().getDimensionByTableName(factTableName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29a979d..db3442e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,6 +263,11 @@ public final class CarbonLoaderUtil {
AbsoluteTableIdentifier absoluteTableIdentifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+ String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+ FileType fileType = FileFactory.getFileType(metadataPath);
+ if (!FileFactory.isFileExist(metadataPath, fileType)) {
+ FileFactory.mkdirs(metadataPath, fileType);
+ }
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 58cc019..e09e3db 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -297,9 +297,9 @@ public class StoreCreator {
String header = reader.readLine();
String[] split = header.split(",");
List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
- List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+ List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
allCols.addAll(dims);
- List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+ List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
allCols.addAll(msrs);
Set<String>[] set = new HashSet[dims.size()];
for (int i = 0; i < set.length; i++) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 943858d..7682437 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -66,7 +66,7 @@ public class StreamSegment {
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table get or create segment");
LoadMetadataDetails[] details =
@@ -104,17 +104,17 @@ public class StreamSegment {
} else {
LOGGER.error(
"Not able to acquire the lock for stream table get or create segment for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during stream table get or create segment");
+ .getTableName() + " during stream table get or create segment");
}
}
}
@@ -132,7 +132,7 @@ public class StreamSegment {
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table finish segment");
LoadMetadataDetails[] details =
@@ -165,17 +165,17 @@ public class StreamSegment {
} else {
LOGGER.error(
"Not able to acquire the lock for stream table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ + "." + table.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 31ed1f6..2c4d35f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -147,7 +147,7 @@ object StreamSinkFactory {
val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
optionsFinal.put("sort_scope", "no_sort")
if (parameters.get("fileheader").isEmpty) {
- optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+ optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala.map(_.getColName).mkString(","))
}
val carbonLoadModel = new CarbonLoadModel()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 6ee3296..c2789f4 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -42,14 +42,14 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
LockUsage.STREAMING_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
cache.put(event.id, lock)
} else {
LOGGER.error("Not able to acquire the lock for stream table:" +
- carbonTable.getDatabaseName + "." + carbonTable.getFactTableName)
+ carbonTable.getDatabaseName + "." + carbonTable.getTableName)
throw new InterruptedException(
"Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
}
}
}
[2/3] carbondata git commit: [CARBONDATA-1739] Clean up store path
interface
Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 822455c..64a066c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -47,9 +47,7 @@ case class CarbonDataMapShowCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
if (schemaList != null && schemaList.size() > 0) {
schemaList.asScala.map { s =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 66f2756..f34afbf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events._
@@ -60,12 +61,11 @@ case class CarbonDropDataMapCommand(
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val identifier = TableIdentifier(tableName, Option(dbName))
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier =
AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -76,20 +76,19 @@ case class CarbonDropDataMapCommand(
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
- val carbonTable: Option[CarbonTable] =
- catalog.getTableFromMetadataCache(dbName, tableName) match {
- case Some(tableMeta) => Some(tableMeta.carbonTable)
- case None => try {
- Some(catalog.lookupRelation(identifier)(sparkSession)
- .asInstanceOf[CarbonRelation].metaData.carbonTable)
- } catch {
- case ex: NoSuchTableException =>
- if (!ifExistsSet) {
- throw ex
- }
- None
- }
+ var carbonTable: Option[CarbonTable] =
+ catalog.getTableFromMetadataCache(dbName, tableName)
+ if (carbonTable.isEmpty) {
+ try {
+ carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable)
+ } catch {
+ case ex: NoSuchTableException =>
+ if (!ifExistsSet) {
+ throw ex
+ }
}
+ }
if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -144,7 +143,7 @@ case class CarbonDropDataMapCommand(
// delete the table folder
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 947cea1..2f04feb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -57,21 +57,21 @@ case class AlterTableCompactionCommand(
if (relation == null) {
sys.error(s"Table $databaseName.$tableName does not exist")
}
- if (null == relation.tableMeta.carbonTable) {
+ if (null == relation.carbonTable) {
LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
sys.error(s"alter table failed. table not found: $databaseName.$tableName")
}
val carbonLoadModel = new CarbonLoadModel()
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
+ val table = relation.carbonTable
+ carbonLoadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath)
+ carbonLoadModel.setTableName(relation.carbonTable.getTableName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 2003bb1..32d6b80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,9 +37,7 @@ case class CarbonShowLoadsCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
CarbonStore.showSegments(
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 8b0dab7..58e33b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
case class CleanFilesCommand(
@@ -38,7 +39,7 @@ case class CleanFilesCommand(
if (forceTableClean) {
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
// TODO: TAABLEPATH
CarbonStore.cleanFiles(
dbName,
@@ -47,10 +48,7 @@ case class CleanFilesCommand(
null,
forceTableClean)
} else {
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = catalog
- .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val cleanFilesPreEvent: CleanFilesPreEvent =
CleanFilesPreEvent(carbonTable,
sparkSession)
@@ -59,7 +57,7 @@ case class CleanFilesCommand(
CarbonStore.cleanFiles(
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
- relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+ CarbonProperties.getStorePath,
carbonTable,
forceTableClean)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
index 6a0465c..5b305ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -35,9 +35,7 @@ case class DeleteLoadByIdCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val operationContext = new OperationContext
val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
index 83f41bb..00c35a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -37,9 +37,7 @@ case class DeleteLoadByLoadDateCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val operationContext = new OperationContext
val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
DeleteSegmentByDatePreEvent(carbonTable,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
index 3f0e093..845a64c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
@@ -47,7 +47,7 @@ case class LoadTableByInsertCommand(
Some(df)).run(sparkSession)
// updating relation metadata. This is in case of auto detect high cardinality
relation.carbonRelation.metaData =
- CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+ CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)
load
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 777c169..0f4ca01 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -54,7 +55,8 @@ case class LoadTableCommand(
isOverwriteTable: Boolean,
var inputSqlString: String = null,
dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None)
+ updateModel: Option[UpdateTableModel] = None,
+ var tableInfoOp: Option[TableInfo] = None)
extends RunnableCommand with DataProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -72,16 +74,6 @@ case class LoadTableCommand(
}
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- if (null == relation.tableMeta.carbonTable) {
- LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
- LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
- sys.error(s"Data loading failed. table not found: $dbName.$tableName")
- }
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -105,18 +97,30 @@ case class LoadTableCommand(
// update the property with new value
carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
-
- val tableProperties = relation.tableMeta.carbonTable.getTableInfo
- .getFactTable.getTableProperties
+ try {
+ val table = if (tableInfoOp.isDefined) {
+ CarbonTable.buildFromTableInfo(tableInfoOp.get)
+ } else {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ if (null == relation.carbonTable) {
+ LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+ LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+ sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+ }
+ relation.carbonTable
+ }
- optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+ val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+ val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
- try {
- val table = relation.tableMeta.carbonTable
val carbonLoadModel = new CarbonLoadModel()
val factPath = if (dataFrame.isDefined) {
""
@@ -137,11 +141,9 @@ case class LoadTableCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
- val storePath = relation.tableMeta.storePath
// add the start entry for the new load in the table status file
if (updateModel.isEmpty) {
- CommonUtil.
- readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+ CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
}
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
@@ -158,8 +160,7 @@ case class LoadTableCommand(
carbonLoadModel.setUseOnePass(false)
}
// Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
@@ -192,9 +193,9 @@ case class LoadTableCommand(
} finally {
// Once the data load is successful delete the unwanted partition files
try {
- val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+ val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
table.getDatabaseName + "/" +
- table.getFactTableName + "/"
+ table.getTableName + "/"
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
val file = FileFactory
@@ -234,7 +235,7 @@ case class LoadTableCommand(
.getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
+ carbonTable.getTableName).asScala.toArray
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (!StringUtils.isEmpty(colDictFilePath)) {
carbonLoadModel.initPredefDictMap()
@@ -378,8 +379,7 @@ case class LoadTableCommand(
val identifier = model.table.getCarbonTableIdentifier
// update CarbonDataLoadSchema
val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName),
- identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a52008a..efb6796 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -31,7 +31,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -64,24 +63,18 @@ object DeleteExecution {
sparkSession: SparkSession,
dataRdd: RDD[Row],
timestamp: String,
- relation: CarbonRelation,
isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors
- ): Boolean = {
+ executorErrors: ExecutionErrors): Boolean = {
var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
val tableName = getTableIdentifier(identifier).table
val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
- asInstanceOf[CarbonRelation]
-
- val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+ val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier)
val factPath = carbonTablePath.getFactDir
- val carbonTable = relation.tableMeta.carbonTable
var deleteStatus = true
val deleteRdd = if (isUpdateOperation) {
val schema =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 34daf4e..6762489 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -54,7 +54,7 @@ object HorizontalCompaction {
}
var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- val carbonTable = carbonRelation.tableMeta.carbonTable
+ val carbonTable = carbonRelation.carbonTable
val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val updateTimeStamp = System.currentTimeMillis()
// To make sure that update and delete timestamps are not same,
@@ -116,7 +116,7 @@ object HorizontalCompaction {
factTimeStamp: Long,
segLists: util.List[String]): Unit = {
val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
+ val table = carbonTable.getTableName
// get the valid segments qualified for update compaction.
val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
@@ -133,7 +133,7 @@ object HorizontalCompaction {
try {
// Update Compaction.
val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName,
+ carbonTable.getTableName,
Some(segmentUpdateStatusManager),
CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
Some(factTimeStamp),
@@ -167,7 +167,7 @@ object HorizontalCompaction {
segLists: util.List[String]): Unit = {
val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
+ val table = carbonTable.getTableName
val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
absTableIdentifier,
segmentUpdateStatusManager,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index b18ab78..5817d88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -60,7 +60,7 @@ object IUDCommonUtil {
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
.getDatabaseName + "." +
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .getFactTableName
+ .getTableName
val sementProperty = carbonProperties
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
if (!(sementProperty.equals("") || sementProperty.trim.equals("*"))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index a898822..cf5bfd8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -51,7 +51,7 @@ private[sql] case class ProjectForDeleteCommand(
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = relation.carbonTable
// trigger event for Delete from table
val operationContext = new OperationContext
@@ -77,9 +77,8 @@ private[sql] case class ProjectForDeleteCommand(
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
- if (DeleteExecution
- .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
- isUpdateOperation = false, executorErrors)) {
+ if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
+ isUpdateOperation = false, executorErrors)) {
// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
isUpdateOperation = false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 549c58f..da62f27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
@@ -58,7 +57,7 @@ private[sql] case class ProjectForUpdateCommand(
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
+ val carbonTable = relation.carbonTable
// trigger event for Update table
val operationContext = new OperationContext
@@ -74,7 +73,7 @@ private[sql] case class ProjectForUpdateCommand(
val currentTime = CarbonUpdateUtil.readCurrentTime
// var dataFrame: DataFrame = null
var dataSet: DataFrame = null
- val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+ val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
@@ -83,7 +82,6 @@ private[sql] case class ProjectForUpdateCommand(
else {
throw new Exception("Table is locked for updation. Please try after some time")
}
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// Get RDD.
dataSet = if (isPersistEnabled) {
@@ -93,7 +91,7 @@ private[sql] case class ProjectForUpdateCommand(
else {
Dataset.ofRows(sparkSession, plan)
}
- var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+ val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
// handle the clean up of IUD.
@@ -101,8 +99,7 @@ private[sql] case class ProjectForUpdateCommand(
// do delete operation.
DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
- currentTime + "",
- relation, isUpdateOperation = true, executionErrors)
+ currentTime + "", isUpdateOperation = true, executionErrors)
if(executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index acd9bd3..5a0e4cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -65,8 +65,7 @@ case class AlterTableDropCarbonPartitionCommand(
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val tablePath = relation.tableMeta.tablePath
+ val tablePath = relation.carbonTable.getTablePath
carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
@@ -75,7 +74,7 @@ case class AlterTableDropCarbonPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val partitionInfo = table.getPartitionInfo(tableName)
if (partitionInfo == null) {
sys.error(s"Table $tableName is not a partition table.")
@@ -101,7 +100,7 @@ case class AlterTableDropCarbonPartitionCommand(
sys.error(s"Dropping range interval partition isn't support yet!")
}
partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -142,17 +141,13 @@ case class AlterTableDropCarbonPartitionCommand(
locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val table = relation.tableMeta.carbonTable
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setTablePath(relation.tableMeta.tablePath)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setTablePath(table.getTablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
alterTableDropPartition(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 0973226..841da67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -69,8 +69,7 @@ case class AlterTableSplitCarbonPartitionCommand(
val tableName = splitPartitionModel.tableName
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- val tablePath = relation.tableMeta.tablePath
+ val tablePath = relation.carbonTable.getTablePath
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
@@ -79,7 +78,7 @@ case class AlterTableSplitCarbonPartitionCommand(
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
}
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val partitionInfo = table.getPartitionInfo(tableName)
val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
// keep a copy of partitionIdList before update partitionInfo.
@@ -95,7 +94,7 @@ case class AlterTableSplitCarbonPartitionCommand(
updatePartitionInfo(partitionInfo, partitionIds)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
// read TableInfo
val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -150,16 +149,12 @@ case class AlterTableSplitCarbonPartitionCommand(
locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val carbonLoadModel = new CarbonLoadModel()
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val tablePath = relation.tableMeta.tablePath
- val table = relation.tableMeta.carbonTable
- val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val tablePath = table.getTablePath
val dataLoadSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(tablePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
index 224304a..903e93b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -41,10 +41,9 @@ private[sql] case class ShowCarbonPartitionsCommand(
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val tableName = carbonTable.getFactTableName
+ .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val carbonTable = relation.carbonTable
+ val tableName = carbonTable.getTableName
val partitionInfo = carbonTable.getPartitionInfo(
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
if (partitionInfo == null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dd002f0..3854f76 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -73,7 +73,7 @@ case class CreatePreAggregateTableCommand(
// getting the parent table
val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
- val parentTableName = parentTable.getFactTableName
+ val parentTableName = parentTable.getTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
@@ -85,9 +85,8 @@ case class CreatePreAggregateTableCommand(
tableModel.dataMapRelation = Some(fieldRelationMap)
CarbonCreateTableCommand(tableModel).run(sparkSession)
try {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation( tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
- val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+ val tableInfo = table.getTableInfo
// child schema object which will be updated on parent table about the
val childSchema = tableInfo.getFactTable
.buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 506a405..f64deec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -84,7 +84,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
- }.${ carbonTable.getFactTableName }")
+ }.${ carbonTable.getTableName }")
}
}
}
@@ -102,7 +102,7 @@ object PreAggregateAddColumnsPreListener extends OperationEventListener {
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
s"Cannot add columns in pre-aggreagate table ${ carbonTable.getDatabaseName
- }.${ carbonTable.getFactTableName }")
+ }.${ carbonTable.getTableName }")
}
}
}
@@ -185,7 +185,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
}
if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
- carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
+ carbonTable.getDatabaseName}.${ carbonTable.getTableName }")
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 3193310..1647f9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -326,21 +326,19 @@ object PreAggregateUtil {
var numberOfCurrentChild: Int = 0
try {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ carbonTable.getAbsoluteTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getTablePath)
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getTablePath)
numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
if (wrapperTableInfo.getDataMapSchemaList.asScala.
exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
@@ -374,7 +372,7 @@ object PreAggregateUtil {
def updateSchemaInfo(carbonTable: CarbonTable,
thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
val dbName = carbonTable.getDatabaseName
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
@@ -435,31 +433,30 @@ object PreAggregateUtil {
def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
carbonTable.getTableLastUpdatedTime
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
- metastore
- .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier,
- thriftTable)(sparkSession)
+ metastore.revertTableSchemaForPreAggCreationFailure(
+ carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
}
}
def getChildCarbonTable(databaseName: String, tableName: String)
(sparkSession: SparkSession): Option[CarbonTable] = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- metaStore.getTableFromMetadataCache(databaseName, tableName) match {
- case Some(tableMeta) => Some(tableMeta.carbonTable)
- case None => try {
+ val carbonTable = metaStore.getTableFromMetadataCache(databaseName, tableName)
+ if (carbonTable.isEmpty) {
+ try {
Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation].metaData.carbonTable)
} catch {
case _: Exception =>
None
}
+ } else {
+ carbonTable
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 2132131..3b39334 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -57,9 +57,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
// older carbon table and this can lead to inconsistent state in the system. Therefor look
// up relation should be called after acquiring the lock
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
alterTableAddColumnsModel)
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index e44899e..c24a8e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -51,9 +51,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
alterTableDataTypeChangeModel)
OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index dae2d7b..721dd0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -53,9 +53,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val partitionInfo = carbonTable.getPartitionInfo(tableName)
if (partitionInfo != null) {
val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f1cce13..e7beedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -81,9 +81,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
locks = AlterTableUtil
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
sparkSession)
- val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation].tableMeta
- carbonTable = tableMeta.carbonTable
+ carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable
// invalid data map for the old table, see CARBON-1690
val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
@@ -134,7 +133,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
- tableMeta.tablePath)(sparkSession)
+ carbonTable.getTablePath)(sparkSession)
val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
carbonTable,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index c126b25..a060833 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -357,11 +357,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
private def getPartitioning(carbonTable: CarbonTable,
output: Seq[Attribute]): Partitioning = {
- val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+ val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)
if (info != null) {
val cols = info.getListOfColumns.asScala
val sortColumn = carbonTable.
- getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+ getDimensionByTableName(carbonTable.getTableName).get(0).getColName
val numBuckets = info.getNumberOfBuckets
val bucketColumns = cols.flatMap { n =>
val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ef2e0a5..d6450c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -51,8 +51,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableIdentifier)(
- sparkSession)
+ .tableExists(tableIdentifier)(sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
@@ -155,13 +154,13 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
- if (CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableName)(sparkSession)) => {
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableName)(sparkSession) => {
ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil
}
case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
- if (CarbonEnv.getInstance(sparkSession).carbonMetastore
- .tableExists(tableName)(sparkSession)) => {
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableName)(sparkSession) => {
ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
}
case _ => Nil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 9ebf47e..49a57e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.hive.CarbonRelation
@@ -76,7 +76,6 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession)
.asInstanceOf[CarbonRelation]
- .tableMeta
.carbonTable
.isStreamingTable
if (streaming) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6d80a26..87c919d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
@@ -44,13 +43,12 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
-case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {
+case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
// clear the metadata
def clear(): Unit = {
- tablesMeta.clear()
+ carbonTables.clear()
}
}
@@ -80,7 +78,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
System.nanoTime() + ""
}
- val metadata = MetaData(new ArrayBuffer[TableMeta]())
+ val metadata = MetaData(new ArrayBuffer[CarbonTable]())
/**
@@ -98,13 +96,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tables = getTableFromMetadataCache(database, tableName)
tables match {
case Some(t) =>
- CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+ CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
case None =>
readCarbonSchema(absIdentifier) match {
case Some(meta) =>
CarbonRelation(database, tableName,
- CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+ CarbonSparkUtil.createSparkMeta(meta), meta)
case None =>
throw new NoSuchTableException(database, tableName)
}
@@ -151,7 +148,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
val operationContext = new OperationContext
val lookupRelationPostEvent: LookupRelationPostEvent =
LookupRelationPostEvent(
- relation.tableMeta.carbonTable,
+ relation.carbonTable,
sparkSession)
OperationListenerBus.getInstance.fireEvent(lookupRelationPostEvent, operationContext)
relation
@@ -164,10 +161,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tableName
* @return
*/
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
- metadata.tablesMeta
- .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+ def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
+ metadata.carbonTables
+ .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
+ table.getTableName.equalsIgnoreCase(tableName))
}
def tableExists(
@@ -187,7 +184,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
true
}
- private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+ private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
val tableName = identifier.getCarbonTableIdentifier.getTableName
val tablePath = identifier.getTablePath
@@ -210,12 +207,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
- identifier.getTablePath,
- identifier.getTablePath,
- carbonTable)
- metadata.tablesMeta += tableMeta
- Some(tableMeta)
+ metadata.carbonTables += carbonTable
+ Some(carbonTable)
} else {
None
}
@@ -378,10 +371,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath,
- absoluteTableIdentifier.getTablePath,
- CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
- metadata.tablesMeta += tableMeta
+ metadata.carbonTables +=
+ CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
}
/**
@@ -391,10 +382,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
+ val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
+ carbonTableToBeRemoved match {
+ case Some(carbonTable) =>
+ metadata.carbonTables -= carbonTable
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
if (LOGGER.isDebugEnabled) {
@@ -409,10 +400,10 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
wrapperTableInfo.getTableUniqueName)
- for (i <- metadata.tablesMeta.indices) {
+ for (i <- metadata.carbonTables.indices) {
if (wrapperTableInfo.getTableUniqueName.equals(
- metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
- metadata.tablesMeta(i).carbonTable = carbonTable
+ metadata.carbonTables(i).getTableUniqueName)) {
+ metadata.carbonTables(i) = carbonTable
}
}
}
@@ -434,8 +425,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
try {
- val tablePath = lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation].tableMeta.tablePath
+ val tablePath = lookupRelation(tableIdentifier)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
FileFactory.isFileExist(tablePath, fileType)
} catch {
@@ -531,13 +522,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
private def refreshCache() {
- metadata.tablesMeta.clear()
+ metadata.carbonTables.clear()
}
override def isReadFromHiveMetaStore: Boolean = false
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
- metadata.tablesMeta.map(_.carbonTable)
+ metadata.carbonTables
override def getThriftTableInfo(tablePath: CarbonTablePath)
(sparkSession: SparkSession): TableInfo = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index dedaf1c..4d4229a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -21,20 +21,17 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Metastore to store carbonschema in hive
@@ -56,10 +53,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava)
if (info != null) {
val table = CarbonTable.buildFromTableInfo(info)
- val meta = new TableMeta(table.getCarbonTableIdentifier,
- absIdentifier.getTablePath, absIdentifier.getTablePath, table)
CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
- CarbonSparkUtil.createSparkMeta(table), meta)
+ CarbonSparkUtil.createSparkMeta(table), table)
} else {
super.createCarbonRelation(parameters, absIdentifier, sparkSession)
}
@@ -107,7 +102,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
carbonTable.getDatabaseName,
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 357a812..696342f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.processing.merger.TableMeta
+
/**
* Interface for Carbonmetastore
@@ -140,7 +140,7 @@ trait CarbonMetaStore {
def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
- def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+ def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 3fb0db0..c48e6e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -761,7 +761,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
.DEFAULT_MAX_NUMBER_OF_COLUMNS
)
}
- val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+ val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo
.getParentRelationIdentifiers.isEmpty
// transform logical plan if the load is for aggregate table.
val childPlan = if (isAggregateTable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 2c476ed..9187fe2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,10 +29,10 @@ import org.apache.spark.sql.types._
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.merger.TableMeta
/**
* Represents logical plan for one carbon table
@@ -41,7 +41,7 @@ case class CarbonRelation(
databaseName: String,
tableName: String,
var metaData: CarbonMetaData,
- tableMeta: TableMeta)
+ carbonTable: CarbonTable)
extends LeafNode with MultiInstanceRelation {
def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
@@ -84,17 +84,17 @@ case class CarbonRelation(
}
override def newInstance(): LogicalPlan = {
- CarbonRelation(databaseName, tableName, metaData, tableMeta)
+ CarbonRelation(databaseName, tableName, metaData, carbonTable)
.asInstanceOf[this.type]
}
- val dimensionsAttr = {
+ val dimensionsAttr: Seq[AttributeReference] = {
val sett = new LinkedHashSet(
- tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+ carbonTable.getDimensionByTableName(carbonTable.getTableName)
.asScala.asJava)
sett.asScala.toSeq.map(dim => {
val dimval = metaData.carbonTable
- .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+ .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName)
val output: DataType = dimval.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
@@ -113,11 +113,10 @@ case class CarbonRelation(
}
val measureAttr = {
- val factTable = tableMeta.carbonTable.getFactTableName
+ val factTable = carbonTable.getTableName
new LinkedHashSet(
- tableMeta.carbonTable.
- getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
- asScala.asJava).asScala.toSeq.map { x =>
+ carbonTable.getMeasureByTableName(carbonTable.getTableName).asScala.asJava).asScala.toSeq
+ .map { x =>
val metastoreType = metaData.carbonTable.getMeasureByName(factTable, x.getColName)
.getDataType.getName.toLowerCase match {
case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
@@ -131,7 +130,7 @@ case class CarbonRelation(
}
override val output = {
- val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+ val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala
// convert each column to Attribute
columns.filter(!_.isInvisible).map { column =>
@@ -196,12 +195,11 @@ case class CarbonRelation(
def sizeInBytes: Long = {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
- tableMeta.carbonTable.getAbsoluteTableIdentifier)
+ carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
- tableMeta.storePath,
- tableMeta.carbonTableIdentifier).getPath
+ carbonTable.getAbsoluteTableIdentifier).getPath
val fileType = FileFactory.getFileType(tablePath)
if(FileFactory.isFileExist(tablePath, fileType)) {
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e587395..b0aecd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events._
/**
@@ -106,15 +107,15 @@ class CarbonSessionCatalog(
alias: Option[String],
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
var isRefreshed = false
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ val storePath = CarbonProperties.getStorePath
carbonEnv.carbonMetastore.
checkSchemasModifiedTimeAndReloadTables()
- val tableMeta = carbonEnv.carbonMetastore
- .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
- if (tableMeta.isEmpty || (tableMeta.isDefined &&
- tableMeta.get.carbonTable.getTableLastUpdatedTime !=
+ val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+ carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getTableName)
+ if (table.isEmpty || (table.isDefined &&
+ table.get.getTableLastUpdatedTime !=
carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
refreshTable(identifier)
DataMapStoreManager.getInstance().
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index b76b24e..9cc5d86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -42,11 +42,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
var databaseLocation = ""
try {
databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
} catch {
case e: NoSuchDatabaseException =>
// ignore the exception as exception will be handled by hive command.run
- databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+ databaseLocation = CarbonProperties.getStorePath
}
// DropHiveDB command will fail if cascade is false and one or more table exists in database
if (command.cascade && tablesInDB != null) {
[3/3] carbondata git commit: [CARBONDATA-1739] Clean up store path
interface
Posted by qi...@apache.org.
[CARBONDATA-1739] Clean up store path interface
This closes #1509
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fc7f06f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fc7f06f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fc7f06f
Branch: refs/heads/master
Commit: 5fc7f06f23e944719b2735b97176d68fe209ad75
Parents: b6777fc
Author: Jacky Li <ja...@qq.com>
Authored: Thu Nov 16 19:41:19 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 14:46:19 2017 +0800
----------------------------------------------------------------------
.../dictionary/ManageDictionaryAndBTree.java | 2 +-
.../core/metadata/CarbonMetadata.java | 2 +-
.../core/metadata/schema/table/CarbonTable.java | 4 +-
.../core/mutate/CarbonUpdateUtil.java | 8 +-
.../carbondata/core/scan/model/QueryModel.java | 4 +-
.../carbondata/core/util/CarbonProperties.java | 7 +
.../core/metadata/CarbonMetadataTest.java | 9 +-
.../metadata/schema/table/CarbonTableTest.java | 3 +-
.../table/CarbonTableWithComplexTypesTest.java | 2 +-
.../carbondata/examples/StreamExample.scala | 4 +-
.../carbondata/hadoop/CarbonInputFormat.java | 2 +-
.../hadoop/api/CarbonTableInputFormat.java | 4 +-
.../streaming/CarbonStreamRecordReader.java | 10 +-
.../streaming/CarbonStreamRecordWriter.java | 4 +-
.../hadoop/util/CarbonInputFormatUtil.java | 6 +-
.../hadoop/test/util/StoreCreator.java | 4 +-
.../presto/impl/CarbonTableReader.java | 2 +-
.../presto/util/CarbonDataStoreCreator.scala | 4 +-
.../TestPreAggregateTableSelection.scala | 2 +-
.../partition/TestDDLForPartitionTable.scala | 6 +-
...ForPartitionTableWithDefaultProperties.scala | 8 +-
.../carbondata/spark/load/ValidateUtil.scala | 4 +-
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 2 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +-
.../carbondata/spark/rdd/PartitionDropper.scala | 2 +-
.../spark/rdd/PartitionSplitter.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 32 +----
.../carbondata/spark/util/DataLoadingUtil.scala | 8 +-
.../spark/util/GlobalDictionaryUtil.scala | 12 +-
.../command/carbonTableSchemaCommon.scala | 4 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 12 +-
.../carbondata/spark/util/CarbonSparkUtil.scala | 18 ++-
.../spark/sql/CarbonDataFrameWriter.scala | 3 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 4 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 16 +--
.../scala/org/apache/spark/sql/CarbonEnv.scala | 36 ++++-
.../scala/org/apache/spark/sql/CarbonScan.scala | 6 +-
.../org/apache/spark/sql/CarbonSource.scala | 20 +--
.../command/CarbonCreateTableCommand.scala | 4 +-
.../CarbonDescribeFormattedCommand.scala | 20 +--
.../command/CarbonDropTableCommand.scala | 9 +-
.../datamap/CarbonDataMapShowCommand.scala | 4 +-
.../datamap/CarbonDropDataMapCommand.scala | 31 +++--
.../AlterTableCompactionCommand.scala | 12 +-
.../management/CarbonShowLoadsCommand.scala | 4 +-
.../command/management/CleanFilesCommand.scala | 10 +-
.../management/DeleteLoadByIdCommand.scala | 4 +-
.../DeleteLoadByLoadDateCommand.scala | 4 +-
.../management/LoadTableByInsertCommand.scala | 2 +-
.../command/management/LoadTableCommand.scala | 62 ++++-----
.../command/mutation/DeleteExecution.scala | 13 +-
.../command/mutation/HorizontalCompaction.scala | 8 +-
.../command/mutation/IUDCommonUtil.scala | 2 +-
.../mutation/ProjectForDeleteCommand.scala | 7 +-
.../mutation/ProjectForUpdateCommand.scala | 11 +-
.../AlterTableDropCarbonPartitionCommand.scala | 19 +--
.../AlterTableSplitCarbonPartitionCommand.scala | 19 +--
.../partition/ShowCarbonPartitionsCommand.scala | 7 +-
.../CreatePreAggregateTableCommand.scala | 7 +-
.../preaaggregate/PreAggregateListeners.scala | 6 +-
.../preaaggregate/PreAggregateUtil.scala | 37 +++---
.../CarbonAlterTableAddColumnCommand.scala | 4 +-
.../CarbonAlterTableDataTypeChangeCommand.scala | 4 +-
.../CarbonAlterTableDropColumnCommand.scala | 4 +-
.../schema/CarbonAlterTableRenameCommand.scala | 7 +-
.../strategy/CarbonLateDecodeStrategy.scala | 4 +-
.../sql/execution/strategy/DDLStrategy.scala | 11 +-
.../strategy/StreamingTableStrategy.scala | 3 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 61 ++++-----
.../spark/sql/hive/CarbonHiveMetaStore.scala | 13 +-
.../apache/spark/sql/hive/CarbonMetaStore.scala | 4 +-
.../sql/hive/CarbonPreAggregateRules.scala | 2 +-
.../apache/spark/sql/hive/CarbonRelation.scala | 26 ++--
.../spark/sql/hive/CarbonSessionState.scala | 13 +-
.../execution/command/CarbonHiveCommands.scala | 4 +-
.../org/apache/spark/util/AlterTableUtil.scala | 36 ++---
.../org/apache/spark/util/CleanFiles.scala | 5 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 5 +-
.../apache/spark/util/DeleteSegmentById.scala | 4 +-
.../partition/TestAlterPartitionTable.scala | 32 ++---
.../spark/util/AllDictionaryTestCase.scala | 16 +--
.../spark/util/DictionaryTestCaseUtil.scala | 6 +-
.../util/ExternalColumnDictionaryTestCase.scala | 16 +--
.../loading/DataLoadProcessBuilder.java | 6 +-
.../merger/CarbonCompactionExecutor.java | 4 +-
.../processing/merger/CarbonCompactionUtil.java | 4 +-
.../processing/merger/CarbonDataMergerUtil.java | 8 +-
.../carbondata/processing/merger/TableMeta.java | 42 ------
.../spliter/AbstractCarbonQueryExecutor.java | 4 +-
.../partition/spliter/RowResultProcessor.java | 2 +-
.../store/CarbonFactDataHandlerColumnar.java | 130 -------------------
.../processing/store/file/FileData.java | 52 --------
.../processing/store/file/FileManager.java | 59 ---------
.../store/file/IFileManagerComposite.java | 57 --------
.../store/writer/AbstractFactDataWriter.java | 4 -
.../store/writer/CarbonDataWriterVo.java | 65 ----------
.../util/CarbonDataProcessorUtil.java | 2 +-
.../processing/util/CarbonLoaderUtil.java | 5 +
.../carbondata/processing/StoreCreator.java | 4 +-
.../streaming/segment/StreamSegment.java | 16 +--
.../streaming/StreamSinkFactory.scala | 2 +-
.../CarbonStreamingQueryListener.scala | 6 +-
102 files changed, 423 insertions(+), 911 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a6c89e0..f8d2495 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -112,7 +112,7 @@ public class ManageDictionaryAndBTree {
}
// clear dictionary cache from LRU cache
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dimension : dimensions) {
removeDictionaryColumnFromCache(carbonTable.getAbsoluteTableIdentifier(),
dimension.getColumnId());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 75fe78b..2face7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -120,7 +120,7 @@ public final class CarbonMetadata {
public CarbonDimension getCarbonDimensionBasedOnColIdentifier(CarbonTable carbonTable,
String columnIdentifier) {
List<CarbonDimension> listOfCarbonDims =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dimension : listOfCarbonDims) {
if (dimension.getColumnId().equals(columnIdentifier)) {
return dimension;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f76ddc9..ac580cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -347,7 +347,7 @@ public class CarbonTable implements Serializable {
/**
* @return the tabelName
*/
- public String getFactTableName() {
+ public String getTableName() {
return absoluteTableIdentifier.getCarbonTableIdentifier().getTableName();
}
@@ -569,7 +569,7 @@ public class CarbonTable implements Serializable {
}
public boolean isPartitionTable() {
- return null != tablePartitionMap.get(getFactTableName());
+ return null != tablePartitionMap.get(getTableName());
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 29cf62a..0b531dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -208,7 +208,7 @@ public class CarbonUpdateUtil {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -257,18 +257,18 @@ public class CarbonUpdateUtil {
status = true;
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 66dfa61..67b8681 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -122,7 +122,7 @@ public class QueryModel implements Serializable {
public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) {
QueryModel queryModel = new QueryModel();
- String factTableName = carbonTable.getFactTableName();
+ String factTableName = carbonTable.getTableName();
queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
fillQueryModel(queryPlan, carbonTable, queryModel, factTableName);
@@ -141,7 +141,7 @@ public class QueryModel implements Serializable {
if (null != queryPlan.getFilterExpression()) {
boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
boolean[] isFilterMeasures =
- new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+ new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
processFilterExpression(queryPlan.getFilterExpression(),
carbonTable.getDimensionByTableName(factTableName),
carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 678a6f7..436950b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -482,6 +482,13 @@ public final class CarbonProperties {
}
/**
+ * Return the store path
+ */
+ public static String getStorePath() {
+ return getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
+ }
+
+ /**
* This method will be used to get the properties value
*
* @param key
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0de160a..5361fb0 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -94,7 +93,7 @@ public class CarbonMetadataTest {
@Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() {
String expectedResult = "carbonTestTable";
- assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getFactTableName());
+ assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
}
@Test public void testGetCarbonTableReturingProperTableWithProperTableUniqueName() {
@@ -171,7 +170,7 @@ public class CarbonMetadataTest {
carbonDimensions.add(new CarbonDimension(colSchema1, 1, 1, 2, 1));
carbonDimensions.add(new CarbonDimension(colSchema2, 2, 2, 2, 2));
new MockUp<CarbonTable>() {
- @Mock public String getFactTableName() {
+ @Mock public String getTableName() {
return "carbonTestTable";
}
@@ -200,7 +199,7 @@ public class CarbonMetadataTest {
colSchema2.setColumnUniqueId("2");
carbonChildDimensions.add(new CarbonDimension(colSchema3, 1, 1, 2, 1));
new MockUp<CarbonTable>() {
- @Mock public String getFactTableName() {
+ @Mock public String getTableName() {
return "carbonTestTable";
}
@@ -242,7 +241,7 @@ public class CarbonMetadataTest {
carbonChildDimensions.add(new CarbonDimension(colSchema2, 1, 1, 2, 1));
new MockUp<CarbonTable>() {
- @Mock public String getFactTableName() {
+ @Mock public String getTableName() {
return "carbonTestTable";
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index 8b66233..a47b7fd 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -57,7 +56,7 @@ public class CarbonTableTest extends TestCase {
}
@Test public void testFactTableNameReturnsProperFactTableName() {
- assertEquals("carbonTestTable", carbonTable.getFactTableName());
+ assertEquals("carbonTestTable", carbonTable.getTableName());
}
@Test public void testTableUniqueNameIsProper() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index e9caf4a..84312cd 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
}
@Test public void testFactTableNameReturnsProperFactTableName() {
- assertEquals("carbonTestTable", carbonTable.getFactTableName());
+ assertEquals("carbonTestTable", carbonTable.getTableName());
}
@Test public void testTableUniqueNameIsProper() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index 4b59aad..43d545d 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -89,9 +89,7 @@ object StreamExample {
| """.stripMargin)
}
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some("default"), streamTableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
// batch load
val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 0aa2974..88d8341 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -364,7 +364,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
TableProvider tableProvider = new SingleTableProvider(carbonTable);
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
BitSet matchedPartitions = null;
- PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
if (partitionInfo != null) {
// prune partitions for filter query on partition table
matchedPartitions = setMatchedPartitions(null, carbonTable, filter, partitionInfo);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 6e840e2..552455a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -393,7 +393,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
Expression filter = getFilterPredicates(job.getConfiguration());
TableProvider tableProvider = new SingleTableProvider(carbonTable);
// this will be null in case of corrupt schema file.
- PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
// prune partitions for filter query on partition table
@@ -787,7 +787,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
Expression filter = getFilterPredicates(configuration);
boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
boolean[] isFilterMeasures =
- new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+ new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
isFilterMeasures);
queryModel.setIsFilterDimensions(isFilterDimensions);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index a22461d..bdd7c28 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -153,13 +153,13 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
}
carbonTable = model.getTable();
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
dimensionCount = dimensions.size();
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
measureCount = measures.size();
List<CarbonColumn> carbonColumnList =
- carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName());
+ carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
@@ -224,8 +224,8 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
private void initializeFilter() {
List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
for (int i = 0; i < dimLensWithComplex.length; i++) {
dimLensWithComplex[i] = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 7df87e3..fdd0504 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -251,8 +251,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
private void writeFileHeader() throws IOException {
List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
for (int i = 0; i < dimLensWithComplex.length; i++) {
dimLensWithComplex[i] = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 630828a..3afad94 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -52,7 +52,7 @@ public class CarbonInputFormatUtil {
if (columnString != null) {
columns = columnString.split(",");
}
- String factTableName = carbonTable.getFactTableName();
+ String factTableName = carbonTable.getTableName();
CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), factTableName);
// fill dimensions
// If columns are null, set all dimensions and measures
@@ -120,9 +120,9 @@ public class CarbonInputFormatUtil {
public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
QueryModel.processFilterExpression(filterExpression, dimensions, measures,
isFilterDimensions, isFilterMeasures);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index b4145ef..c45f910 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -309,9 +309,9 @@ public class StoreCreator {
String header = reader.readLine();
String[] split = header.split(",");
List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
- List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+ List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
allCols.addAll(dims);
- List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+ List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
allCols.addAll(msrs);
Set<String>[] set = new HashSet[dims.size()];
for (int i = 0; i < set.length; i++) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 8e6abd4..f72bb7a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -363,7 +363,7 @@ public class CarbonTableReader {
.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), null).getPath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
- config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getFactTableName());
+ config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 17a4188..1430baf 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -333,10 +333,10 @@ object CarbonDataStoreCreator {
val split: Array[String] = header.split(",")
val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]()
val dims: util.List[CarbonDimension] =
- table.getDimensionByTableName(table.getFactTableName)
+ table.getDimensionByTableName(table.getTableName)
allCols.addAll(dims)
val msrs: List[CarbonMeasure] =
- table.getMeasureByTableName(table.getFactTableName)
+ table.getMeasureByTableName(table.getTableName)
allCols.addAll(msrs)
val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
for (i <- set.indices) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 6b435c6..1d41664 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -147,7 +147,7 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
case logicalRelation:LogicalRelation =>
if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+ if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
isValidPlan = true
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 3f99922..df1bd2e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -51,7 +51,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
@@ -74,7 +74,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -101,7 +101,7 @@ class TestDDLForPartitionTable extends QueryTest with BeforeAndAfterAll {
| 'LIST_INFO'='0, 1, (2, 3)')
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.STRING)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index 317e2e2..c17ca00 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -45,7 +45,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
@@ -68,7 +68,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -96,7 +96,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo
| 'DICTIONARY_INCLUDE'='projectenddate')
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTable")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
@@ -128,7 +128,7 @@ class TestDDLForPartitionTableWithDefaultProperties extends QueryTest with Befo
| 'LIST_INFO'='2017-06-11 , 2017-06-13')
""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_listTableDate")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
assert(partitionInfo != null)
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.DATE)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index 8eb5101..51e0cc4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -51,8 +51,8 @@ object ValidateUtil {
def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
if (sortScope != null) {
// Don't support use global sort on partitioned table.
- if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null &&
- sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
+ if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
+ sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
"table.")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 5c6760a..37ab8c3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -46,7 +46,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
val oldPartitionIds = alterPartitionModel.oldPartitionIds
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val databaseName = carbonTable.getDatabaseName
- val factTableName = carbonTable.getFactTableName
+ val factTableName = carbonTable.getTableName
val partitionInfo = carbonTable.getPartitionInfo(factTableName)
override protected def getPartitions: Array[Partition] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ca21bc..0fed5a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -548,7 +548,7 @@ class PartitionTableDataLoaderRDD[K, V](
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
val model: CarbonLoadModel = carbonLoadModel
val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val uniqueLoadStatusId =
carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index a82ea00..2aa5610 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -41,7 +41,7 @@ object PartitionDropper {
val dropWithData = dropPartitionCallableModel.dropWithData
val carbonTable = dropPartitionCallableModel.carbonTable
val dbName = carbonTable.getDatabaseName
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val partitionInfo = carbonTable.getPartitionInfo(tableName)
val partitioner = PartitionFactory.getPartitioner(partitionInfo)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index db664b3..9106cca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -40,7 +40,7 @@ object PartitionSplitter {
val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
val databaseName = carbonTable.getDatabaseName
val bucketInfo = carbonTable.getBucketingInfo(tableName)
var finalSplitStatus = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1b21e3d..a3572ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -55,7 +55,6 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
@@ -516,7 +515,6 @@ object CommonUtil {
}
def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
- storePath: String,
insertOverwrite: Boolean): Unit = {
val newLoadMetaEntry = new LoadMetadataDetails
val status = if (insertOverwrite) {
@@ -528,16 +526,13 @@ object CommonUtil {
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime
model.setFactTimeStamp(loadStartTime)
- CarbonLoaderUtil
- .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp, false)
+ CarbonLoaderUtil.populateNewLoadMetaEntry(
+ newLoadMetaEntry, status, model.getFactTimeStamp, false)
val entryAdded: Boolean =
CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
if (!entryAdded) {
- sys
- .error(s"Failed to add entry in table status for ${ model.getDatabaseName }.${
- model
- .getTableName
- }")
+ sys.error(s"Failed to add entry in table status for " +
+ s"${ model.getDatabaseName }.${model.getTableName}")
}
}
@@ -856,26 +851,9 @@ object CommonUtil {
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
- carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+ carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
segmentIds).collect()
}
}
- /**
- * can be removed with the spark 1.6 removal
- * @param tableMeta
- * @return
- */
- @deprecated
- def getTablePath(tableMeta: TableMeta): String = {
- if (tableMeta.tablePath == null) {
- tableMeta.storePath + CarbonCommonConstants.FILE_SEPARATOR +
- tableMeta.carbonTableIdentifier.getDatabaseName +
- CarbonCommonConstants.FILE_SEPARATOR + tableMeta.carbonTableIdentifier.getTableName
- }
- else {
- tableMeta.tablePath
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 35e1e78..84ad85e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -180,10 +180,10 @@ object DataLoadingUtil {
options: immutable.Map[String, String],
optionsFinal: mutable.Map[String, String],
carbonLoadModel: CarbonLoadModel): Unit = {
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(table.getTablePath)
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -199,7 +199,7 @@ object DataLoadingUtil {
val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
val all_dictionary_path = optionsFinal("all_dictionary_path")
val column_dict = optionsFinal("columndict")
- ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+ ValidateUtil.validateDateFormat(dateFormat, table, table.getTableName)
ValidateUtil.validateSortScope(table, sort_scope)
if (bad_records_logger_enable.toBoolean ||
@@ -236,7 +236,7 @@ object DataLoadingUtil {
}
} else {
if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+ fileHeader = table.getCreateOrderColumn(table.getTableName)
.asScala.map(_.getColName).mkString(",")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 975fc9b..0bf2b16 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -676,17 +676,17 @@ object GlobalDictionaryUtil {
*/
def generateGlobalDictionary(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
+ tablePath: String,
dataFrame: Option[DataFrame] = None): Unit = {
try {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
// create dictionary folder if not exists
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
// columns which need to generate global dictionary file
val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
+ carbonTable.getTableName).asScala.toArray
// generate global dict from pre defined column dict file
carbonLoadModel.initPredefDictMap()
@@ -701,7 +701,7 @@ object GlobalDictionaryUtil {
if (colDictFilePath != null) {
// generate predefined dictionary
generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
+ dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
}
if (headers.length > df.columns.length) {
val msg = "The number of columns in the file header do not match the " +
@@ -717,7 +717,7 @@ object GlobalDictionaryUtil {
// select column to push down pruning
df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimension, storePath, dictfolderPath, false)
+ requireDimension, tablePath, dictfolderPath, false)
// combine distinct value in a block and partition by column
val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -731,7 +731,7 @@ object GlobalDictionaryUtil {
} else {
generateDictionaryFromDictionaryFiles(sqlContext,
carbonLoadModel,
- storePath,
+ tablePath,
carbonTableIdentifier,
dictfolderPath,
dimensions,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 756de6b..2f6b277 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -454,7 +454,7 @@ class TableNewProcessor(cm: TableModel) {
val field = cm.dimCols.find(keyDim equals _.column).get
val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
cm.parentTable.get.getColumnByName(
- cm.parentTable.get.getFactTableName,
+ cm.parentTable.get.getTableName,
cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
} else {
val encoders = new java.util.ArrayList[Encoding]()
@@ -479,7 +479,7 @@ class TableNewProcessor(cm: TableModel) {
val encoders = if (cm.parentTable.isDefined &&
cm.dataMapRelation.get.get(field).isDefined) {
cm.parentTable.get.getColumnByName(
- cm.parentTable.get.getFactTableName,
+ cm.parentTable.get.getTableName,
cm.dataMapRelation.get.get(field).get.
columnTableRelation.get.parentColumnName).getEncoder
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1ca7456..c12d2ef 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -179,7 +179,7 @@ object CarbonDataRDDFactory {
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
s"${ tableForCompaction.getDatabaseName }." +
- s"${ tableForCompaction.getFactTableName}")
+ s"${ tableForCompaction.getTableName}")
val table: CarbonTable = tableForCompaction
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -204,7 +204,7 @@ object CarbonDataRDDFactory {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
s"${ tableForCompaction.getDatabaseName }." +
- s"${ tableForCompaction.getFactTableName }")
+ s"${ tableForCompaction.getTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
@@ -216,7 +216,7 @@ object CarbonDataRDDFactory {
skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
s"${ tableForCompaction.getDatabaseName }." +
- s"${ tableForCompaction.getFactTableName }")
+ s"${ tableForCompaction.getTableName }")
}
}
// ********* check again for all the tables.
@@ -248,7 +248,7 @@ object CarbonDataRDDFactory {
table: CarbonTable
): CarbonLoadModel = {
val loadModel = new CarbonLoadModel
- loadModel.setTableName(table.getFactTableName)
+ loadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
loadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -319,7 +319,7 @@ object CarbonDataRDDFactory {
}
}
} else {
- status = if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+ status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel)
} else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
@@ -782,7 +782,7 @@ object CarbonDataRDDFactory {
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel): RDD[Row] = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
val columns = carbonLoadModel.getCsvHeaderColumns
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 1e6a36e..47f5344 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -21,23 +21,20 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.processing.merger.TableMeta
case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
object CarbonSparkUtil {
def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
- val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+ val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName)
.asScala.map(x => x.getColName) // wf : may be problem
- val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+ val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName)
.asScala.map(x => x.getColName)
val dictionary =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
+ carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
(f.getColName.toLowerCase,
f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!f.getDataType.isComplexType)
@@ -47,10 +44,11 @@ object CarbonSparkUtil {
def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
val table = CarbonTable.buildFromTableInfo(tableInfo)
- val meta = new TableMeta(table.getCarbonTableIdentifier,
- table.getTablePath, tablePath, table)
- CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
- CarbonSparkUtil.createSparkMeta(table), meta)
+ CarbonRelation(
+ tableInfo.getDatabaseName,
+ tableInfo.getFactTable.getTableName,
+ CarbonSparkUtil.createSparkMeta(table),
+ table)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 44fbb37..b74576d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.types._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
@@ -58,7 +59,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
*/
private def loadTempCSV(options: CarbonOption): Unit = {
// temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
+ val storePath = CarbonProperties.getStorePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
.append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 22933f2..72f40ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -50,7 +50,7 @@ case class CarbonDatasourceHadoopRelation(
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head,
parameters("dbname"), parameters("tablename"))
lazy val databaseName: String = carbonTable.getDatabaseName
- lazy val tableName: String = carbonTable.getFactTableName
+ lazy val tableName: String = carbonTable.getTableName
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
@transient lazy val carbonRelation: CarbonRelation =
@@ -58,7 +58,7 @@ case class CarbonDatasourceHadoopRelation(
createCarbonRelation(parameters, identifier, sparkSession)
- @transient lazy val carbonTable: CarbonTable = carbonRelation.tableMeta.carbonTable
+ @transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable
override def sqlContext: SQLContext = sparkSession.sqlContext
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c7db436..9d88c4c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -72,7 +72,7 @@ case class CarbonDictionaryDecoder(
attachTree(this, "execute") {
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
- (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+ (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -125,7 +125,7 @@ case class CarbonDictionaryDecoder(
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
- (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+ (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -323,7 +323,7 @@ object CarbonDictionaryDecoder {
if (relation.isDefined && canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension = carbonTable
- .getDimensionByName(carbonTable.getFactTableName, attr.name)
+ .getDimensionByName(carbonTable.getTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -355,7 +355,7 @@ object CarbonDictionaryDecoder {
if (relation.isDefined) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension = carbonTable
- .getDimensionByName(carbonTable.getFactTableName, attr.name)
+ .getDimensionByName(carbonTable.getTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -432,12 +432,12 @@ object CarbonDictionaryDecoder {
if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex) {
- (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+ (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
@@ -485,12 +485,12 @@ class CarbonDecoderRDD(
if (relation.isDefined && canBeDecoded(attr)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex()) {
- (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+ (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1ee7650..dcfce0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonSessionCatalog}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.internal.CarbonSQLConf
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
-import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationListenerBus}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -41,8 +43,6 @@ class CarbonEnv {
var carbonSessionInfo: CarbonSessionInfo = _
- var storePath: String = _
-
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
// set readsupport class global so that the executor can get it.
@@ -74,7 +74,7 @@ class CarbonEnv {
config.addDefaultCarbonSessionParams()
carbonMetastore = {
val properties = CarbonProperties.getInstance()
- storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+ var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
if (storePath == null) {
storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
@@ -112,6 +112,30 @@ object CarbonEnv {
carbonEnv
}
}
-}
+ /**
+ * Return carbon table instance by looking up relation in `sparkSession`
+ */
+ def getCarbonTable(
+ databaseNameOp: Option[String],
+ tableName: String)
+ (sparkSession: SparkSession): CarbonTable = {
+ CarbonEnv
+ .getInstance(sparkSession)
+ .carbonMetastore
+ .lookupRelation(databaseNameOp, tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+ }
+ def getCarbonTable(
+ tableIdentifier: TableIdentifier)
+ (sparkSession: SparkSession): CarbonTable = {
+ CarbonEnv
+ .getInstance(sparkSession)
+ .carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 0806421..99a7c37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -65,11 +65,11 @@ case class CarbonScan(
attributesRaw = attributeOut
}
- val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+ val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
val colAttr = new Array[Attribute](columns.size())
attributesRaw.foreach { attr =>
val column =
- carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+ carbonTable.getColumnByName(carbonTable.getTableName, attr.name)
if(column != null) {
colAttr(columns.indexOf(column)) = attr
}
@@ -78,7 +78,7 @@ case class CarbonScan(
var queryOrder: Integer = 0
attributesRaw.foreach { attr =>
- val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+ val carbonColumn = carbonTable.getColumnByName(carbonTable.getTableName, attr.name)
if (carbonColumn != null) {
if (carbonColumn.isDimension()) {
val dim = new QueryDimension(attr.name)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fba590e..6331f12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -165,7 +165,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
} else {
CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession)
- (CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName", parameters)
+ (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
}
} catch {
case ex: NoSuchTableException =>
@@ -199,11 +199,10 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
if (parameters.contains("tablePath")) {
(parameters("tablePath"), parameters)
} else if (!sparkSession.isInstanceOf[CarbonSession]) {
- (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName, parameters)
+ (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters)
} else {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- (relation.tableMeta.tablePath, parameters)
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ (carbonTable.getTablePath, parameters)
}
} catch {
case ex: Exception =>
@@ -235,15 +234,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
}
if (tablePathOption.isDefined) {
val sparkSession = sqlContext.sparkSession
- val identifier: AbsoluteTableIdentifier =
- AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName)
- val carbonTable =
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
if (!carbonTable.isStreamingTable) {
throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
- s"${carbonTable.getFactTableName} is not a streaming table")
+ s"${carbonTable.getTableName} is not a streaming table")
}
// create sink
@@ -314,8 +309,7 @@ object CarbonSource {
val tableName: String = properties.getOrElse("tableName", "").toLowerCase
val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
val tableInfo: TableInfo = TableNewProcessor(model)
- val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath)
val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val schemaEvolutionEntry = new SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index 197b23b..f83766d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
case class CarbonCreateTableCommand(
cm: TableModel,
@@ -37,7 +37,7 @@ case class CarbonCreateTableCommand(
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ val storePath = CarbonProperties.getStorePath
CarbonEnv.getInstance(sparkSession).carbonMetastore.
checkSchemasModifiedTimeAndReloadTables()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
index 7dcad9a..b233c99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
@@ -29,6 +29,7 @@ import org.codehaus.jackson.map.ObjectMapper
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.util.CarbonProperties
private[sql] case class CarbonDescribeFormattedCommand(
child: SparkPlan,
@@ -68,7 +69,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
val colComment = field.getComment().getOrElse("null")
val comment = if (dims.contains(fieldName)) {
val dimension = relation.metaData.carbonTable.getDimensionByName(
- relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
+ relation.carbonTable.getTableName, fieldName)
if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) {
colProps.append(fieldName).append(".")
.append(mapper.writeValueAsString(dimension.getColumnProperties))
@@ -101,12 +102,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
colProps.toString()
}
results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
- results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
- .getDatabaseName, "")
+ results ++= Seq(("Database Name: ", relation.carbonTable.getDatabaseName, "")
)
- results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
- results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
- val carbonTable = relation.tableMeta.carbonTable
+ results ++= Seq(("Table Name: ", relation.carbonTable.getTableName, ""))
+ results ++= Seq(("CARBON Store Path: ", CarbonProperties.getStorePath, ""))
+ val carbonTable = relation.carbonTable
// Carbon table support table comment
val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties
.getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "")
@@ -122,14 +122,14 @@ private[sql] case class CarbonDescribeFormattedCommand(
results ++= Seq(("ADAPTIVE", "", ""))
}
results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
- relation.tableMeta.carbonTableIdentifier.getTableName).asScala
+ relation.carbonTable.getTableName).asScala
.map(column => column).mkString(","), ""))
val dimension = carbonTable
- .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+ .getDimensionByTableName(relation.carbonTable.getTableName)
results ++= getColumnGroups(dimension.asScala.toList)
- if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+ if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
results ++=
- Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getTableName)
.getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
}
results.map {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 0343393..f0a916a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events._
@@ -50,12 +50,11 @@ case class CarbonDropTableCommand(
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val identifier = TableIdentifier(tableName, Option(dbName))
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetastore
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val absoluteTableIdentifier = AbsoluteTableIdentifier
.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -68,7 +67,7 @@ case class CarbonDropTableCommand(
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
val carbonTable: Option[CarbonTable] =
catalog.getTableFromMetadataCache(dbName, tableName) match {
- case Some(tableMeta) => Some(tableMeta.carbonTable)
+ case Some(carbonTable) => Some(carbonTable)
case None => try {
Some(catalog.lookupRelation(identifier)(sparkSession)
.asInstanceOf[CarbonRelation].metaData.carbonTable)
@@ -131,7 +130,7 @@ case class CarbonDropTableCommand(
// delete the table folder
val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonProperties.getStorePath)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
val metadataFilePath =