You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/18 15:29:43 UTC
[09/28] carbondata git commit: [CARBONDATA-1739] Clean up store path
interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 153b169..07491d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -65,7 +65,7 @@ object AlterTableUtil {
sys.error(s"Table $dbName.$tableName does not exist")
}
// acquire the lock first
- val table = relation.tableMeta.carbonTable
+ val table = relation.carbonTable
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
@@ -133,7 +133,7 @@ object AlterTableUtil {
thriftTable: TableInfo)(sparkSession: SparkSession,
sessionState: CarbonSessionState): Unit = {
val dbName = carbonTable.getDatabaseName
- val tableName = carbonTable.getFactTableName
+ val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
@@ -232,10 +232,7 @@ object AlterTableUtil {
def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
-
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -262,9 +259,7 @@ object AlterTableUtil {
def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -297,9 +292,7 @@ object AlterTableUtil {
def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -343,30 +336,27 @@ object AlterTableUtil {
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
var timeStamp = 0L
- var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
var carbonTable: CarbonTable = null
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
// get the latest carbon table
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getTablePath)
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getTablePath)
val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(timeStamp)
- val thriftTable = schemaConverter
- .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+ wrapperTableInfo, dbName, tableName)
val tblPropertiesMap: mutable.Map[String, String] =
thriftTable.fact_table.getTableProperties.asScala
if (set) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index dcfbaea..c05c0f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,6 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -40,9 +39,7 @@ object CleanFiles {
def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
storePath: String, forceTableClean: Boolean = false): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 8375762..d682b21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,6 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
- import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
@@ -30,9 +29,7 @@ object DeleteSegmentByDate {
def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
dateValue: String): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 9b87504..5b58c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -34,9 +34,7 @@ object DeleteSegmentById {
def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
segmentIds: Seq[String]): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
- lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 23cba20..287191c 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -245,7 +245,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table add partition: List Partition") {
sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -286,7 +286,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
val partitionIds2 = partitionInfo2.getPartitionIds
val list_info2 = partitionInfo2.getListInfo
assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -304,7 +304,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table add partition: Range Partition") {
sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -342,7 +342,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val range_info1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
@@ -373,7 +373,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: List Partition") {
sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava)
@@ -415,7 +415,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val list_info1 = partitionInfo1.getListInfo
assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
@@ -438,7 +438,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_country ADD PARTITION ('(Part1, Part2, Part3, Part4)')""".stripMargin)
sql("""ALTER TABLE list_table_country SPLIT PARTITION(9) INTO ('Part4', 'Part2', '(Part1, Part3)')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
@@ -485,7 +485,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE list_table_area ADD PARTITION ('(One,Two, Three, Four)')""".stripMargin)
sql("""ALTER TABLE list_table_area SPLIT PARTITION(6) INTO ('One', '(Two, Three )', 'Four')""".stripMargin)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
assert(partitionIds == List(0, 1, 3, 4, 5, 7, 8, 9).map(Integer.valueOf(_)).asJava)
@@ -528,7 +528,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: Range Partition") {
sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val rangeInfo = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -566,7 +566,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val rangeInfo1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -586,7 +586,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
test("Alter table split partition: Range Partition + Bucket") {
sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val rangeInfo = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
@@ -624,7 +624,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val rangeInfo1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
@@ -642,7 +642,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getTableName)
val partitionIds2 = partitionInfo2.getPartitionIds
val rangeInfo2 = partitionInfo2.getRangeInfo
assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
@@ -659,7 +659,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
- val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getTableName)
val partitionIds3 = partitionInfo3.getPartitionIds
val rangeInfo3 = partitionInfo3.getRangeInfo
assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
@@ -789,7 +789,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("ALTER TABLE carbon_table_default_db ADD PARTITION ('2017')")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_carbon_table_default_db")
- val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
assert(partitionIds == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
@@ -809,7 +809,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
sql("ALTER TABLE carbondb.carbontable ADD PARTITION ('2017')")
val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("carbondb_carbontable")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getFactTableName)
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable1.getTableName)
val partitionIds1 = partitionInfo1.getPartitionIds
val range_info1 = partitionInfo1.getRangeInfo
assert(partitionIds1 == List(0, 1, 2, 3).map(Integer.valueOf(_)).asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index a3024be..3f5d8c6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -43,12 +43,12 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
header: String,
allDictFilePath: String): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
- val table = relation.tableMeta.carbonTable
+ carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+ val table = relation.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
@@ -141,10 +141,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
test("Support generate global dictionary from all dictionary files") {
val header = "id,name,city,age"
val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, header, sampleAllDictionaryFile)
- GlobalDictionaryUtil
- .generateGlobalDictionary(sqlContext,
- carbonLoadModel,
- sampleRelation.tableMeta.storePath)
+ GlobalDictionaryUtil.generateGlobalDictionary(
+ sqlContext, carbonLoadModel, sampleRelation.carbonTable.getTablePath)
DictionaryTestCaseUtil.
checkDictionary(sampleRelation, "city", "shenzhen")
@@ -156,7 +154,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext,
carbonLoadModel,
- complexRelation.tableMeta.storePath)
+ complexRelation.carbonTable.getTablePath)
DictionaryTestCaseUtil.
checkDictionary(complexRelation, "channelsId", "1650")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 930de43..4551120 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -38,9 +38,9 @@ object DictionaryTestCaseUtil {
* @param value a value of column
*/
def checkDictionary(relation: CarbonRelation, columnName: String, value: String) {
- val table = relation.tableMeta.carbonTable
- val dimension = table.getDimensionByName(table.getFactTableName, columnName)
- val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
+ val table = relation.carbonTable
+ val dimension = table.getDimensionByName(table.getTableName, columnName)
+ val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
val absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
dimension.getColumnIdentifier, dimension.getDataType,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d37a68b..78ae384 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -151,12 +151,12 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
extColFilePath: String,
csvDelimiter: String = ","): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getTableName)
- val table = relation.tableMeta.carbonTable
+ carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
+ carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+ val table = relation.carbonTable
val carbonSchema = new CarbonDataLoadSchema(table)
carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setTableName(table.getTableName)
carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
carbonLoadModel.setFactFilePath(filePath)
carbonLoadModel.setCsvHeader(header)
@@ -198,7 +198,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath1)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
@@ -207,7 +207,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath2)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check the old dictionary and whether the new distinct value is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "deviceInformationId", "10086")
@@ -220,7 +220,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
var carbonLoadModel = buildCarbonLoadModel(extComplexRelation, complexFilePath1,
header, extColDictFilePath3)
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- extComplexRelation.tableMeta.storePath)
+ extComplexRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
extComplexRelation, "channelsId", "1421|")
@@ -229,7 +229,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel = buildCarbonLoadModel(verticalDelimiteRelation, complexFilePath2,
header2, extColDictFilePath3, "|")
GlobalDictionaryUtil.generateGlobalDictionary(sqlContext, carbonLoadModel,
- verticalDelimiteRelation.tableMeta.storePath)
+ verticalDelimiteRelation.carbonTable.getTablePath)
// check whether the dictionary is generated
DictionaryTestCaseUtil.checkDictionary(
verticalDelimiteRelation, "channelsId", "1431,")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 442d93e..71c3dc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -182,9 +182,9 @@ public final class DataLoadProcessBuilder {
loadModel.getBadRecordsLocation());
CarbonMetadata.getInstance().addCarbonTable(carbonTable);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
Map<String, String> dateFormatMap =
CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
List<DataField> dataFields = new ArrayList<>();
@@ -209,7 +209,7 @@ public final class DataLoadProcessBuilder {
}
}
configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
- configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+ configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getTableName()));
// configuration for one pass load: dictionary server info
configuration.setUseOnePass(loadModel.getUseOnePass());
configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index be3572c..65f70a0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -209,7 +209,7 @@ public class CarbonCompactionExecutor {
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dim : dimensions) {
// check if dimension is deleted
QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -220,7 +220,7 @@ public class CarbonCompactionExecutor {
List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure carbonMeasure : measures) {
// check if measure is deleted
QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 08b8600..c60bb24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -308,7 +308,7 @@ public class CarbonCompactionUtil {
public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
List<CarbonDimension> masterDimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
for (CarbonDimension dimension : masterDimensions) {
Integer value = columnCardinalityMap.get(dimension.getColumnId());
@@ -321,7 +321,7 @@ public class CarbonCompactionUtil {
}
// add measures to the column schema list
List<CarbonMeasure> masterSchemaMeasures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure measure : masterSchemaMeasures) {
updatedColumnSchemaList.add(measure.getColumnSchema());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c1df349..8f6d19c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1264,7 +1264,7 @@ public final class CarbonDataMergerUtil {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -1284,18 +1284,18 @@ public final class CarbonDataMergerUtil {
}
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
deleted file mode 100644
index 09dbfff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
- private static final long serialVersionUID = -1749874611119829431L;
-
- public CarbonTableIdentifier carbonTableIdentifier;
- public String storePath;
- public CarbonTable carbonTable;
- public String tablePath;
-
- public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
- CarbonTable carbonTable) {
- this.carbonTableIdentifier = carbonTableIdentifier;
- this.storePath = storePath;
- this.tablePath = tablePath;
- this.carbonTable = carbonTable;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index aeddac6..36e022b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -78,7 +78,7 @@ public abstract class AbstractCarbonQueryExecutor {
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
for (CarbonDimension dim : dimensions) {
// check if dimension is deleted
QueryDimension queryDimension = new QueryDimension(dim.getColName());
@@ -89,7 +89,7 @@ public abstract class AbstractCarbonQueryExecutor {
List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (CarbonMeasure carbonMeasure : measures) {
// check if measure is deleted
QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 1db414f..48c5471 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -46,7 +46,7 @@ public class RowResultProcessor {
SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
- String tableName = carbonTable.getFactTableName();
+ String tableName = carbonTable.getTableName();
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 504e7ec..75fcea3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.processing.store;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -42,19 +41,15 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
-import org.apache.carbondata.processing.store.file.FileManager;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
@@ -77,10 +72,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private CarbonFactDataWriter dataWriter;
/**
- * File manager
- */
- private IFileManagerComposite fileManager;
- /**
* total number of entries in blocklet
*/
private int entryCount;
@@ -91,11 +82,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private int pageSize;
- // This variable is true if it is dictionary dimension and its cardinality is lower than
- // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
- // It decides whether it will do RLE encoding on data page for this dimension
- private boolean[] rleEncodingForDictDimension;
- private boolean[] isNoDictionary;
private long processedDataCount;
private ExecutorService producerExecutorService;
private List<Future<Void>> producerExecutorServiceTaskList;
@@ -130,12 +116,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private boolean processingComplete;
/**
- * boolean to check whether dimension
- * is of dictionary type or no dictionary type
- */
- private boolean[] isDictDimension;
-
- /**
* current data format version
*/
private ColumnarFormatVersion version;
@@ -146,47 +126,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
this.model = model;
initParameters(model);
-
- int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
- + getExpandedComplexColsCount();
- this.rleEncodingForDictDimension = new boolean[numDimColumns];
- this.isNoDictionary = new boolean[numDimColumns];
-
- int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
- // setting true value for dims of high card
- for (int i = 0; i < model.getNoDictionaryCount(); i++) {
- this.isNoDictionary[noDictStartIndex + i] = true;
- }
-
- boolean isAggKeyBlock = Boolean.parseBoolean(
- CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
- CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
- if (isAggKeyBlock) {
- int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
- for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) {
- if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) {
- this.rleEncodingForDictDimension[i] = true;
- }
- }
-
- if (model.getDimensionCount() < dimLens.length) {
- int allColsCount = getColsCount(model.getDimensionCount());
- List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
- for (int i = 0; i < model.getDimensionCount(); i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i);
- if (complexDataType != null) {
- complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
- } else {
- rleWithComplex.add(this.rleEncodingForDictDimension[i]);
- }
- }
- this.rleEncodingForDictDimension = new boolean[allColsCount];
- for (int i = 0; i < allColsCount; i++) {
- this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
- }
- }
- }
this.version = CarbonProperties.getInstance().getFormatVersion();
StringBuffer noInvertedIdxCol = new StringBuffer();
for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
@@ -202,13 +141,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
SortScopeOptions.SortScope sortScope = model.getSortScope();
this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
- //TODO need to pass carbon table identifier to metadata
- CarbonTable carbonTable =
- CarbonMetadata.getInstance().getCarbonTable(
- model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
- isDictDimension =
- CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
-
// in compaction flow the measure with decimal type will come as spark decimal.
// need to convert it to byte array.
if (model.isCompactionFlow()) {
@@ -247,19 +179,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
}
- private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
- int counter = 0;
- boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
- for (int i = 0; i < isDictDimension.length; i++) {
- if (isDictDimension[i]) {
- uniqueBlock[i] = aggKeyBlock[counter++];
- } else {
- uniqueBlock[i] = false;
- }
- }
- return uniqueBlock;
- }
-
private void setComplexMapSurrogateIndex(int dimensionCount) {
int surrIndex = 0;
for (int i = 0; i < dimensionCount; i++) {
@@ -283,9 +202,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @throws CarbonDataWriterException
*/
public void initialise() throws CarbonDataWriterException {
- fileManager = new FileManager();
- // todo: the fileManager seems to be useless, remove it later
- fileManager.setName(new File(model.getStoreLocation()[0]).getName());
setWritingConfiguration();
}
@@ -412,27 +328,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
- private int getColsCount(int columnSplit) {
- int count = 0;
- for (int i = 0; i < columnSplit; i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i);
- if (complexDataType != null) {
- count += complexDataType.getColsCount();
- } else count++;
- }
- return count;
- }
-
// return the number of complex column after complex columns are expanded
private int getExpandedComplexColsCount() {
return model.getExpandedComplexColsCount();
}
- // return the number of complex column
- private int getComplexColumnCount() {
- return model.getComplexIndexMap().size();
- }
-
/**
* below method will be used to close the handler
*/
@@ -519,7 +419,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.dataWriter = getFactDataWriter();
// initialize the channel;
this.dataWriter.initializeWriter();
- //initializeColGrpMinMax();
}
/**
@@ -571,14 +470,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
carbonDataWriterVo.setTableName(model.getTableName());
- carbonDataWriterVo.setFileManager(fileManager);
- carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
- carbonDataWriterVo.setIsComplexType(isComplexTypes());
carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
- carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
carbonDataWriterVo.setColCardinality(model.getColCardinality());
carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
@@ -590,31 +485,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return carbonDataWriterVo;
}
- private boolean[] isComplexTypes() {
- int noDictionaryCount = model.getNoDictionaryCount();
- int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
- int allColsCount = getColsCount(noOfColumn);
- boolean[] isComplexType = new boolean[allColsCount];
-
- List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
- for (int i = 0; i < noOfColumn; i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
- if (complexDataType != null) {
- int count = complexDataType.getColsCount();
- for (int j = 0; j < count; j++) {
- complexTypesList.add(true);
- }
- } else {
- complexTypesList.add(false);
- }
- }
- for (int i = 0; i < allColsCount; i++) {
- isComplexType[i] = complexTypesList.get(i);
- }
-
- return isComplexType;
- }
-
/**
* This method will reset the block processing count
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
deleted file mode 100644
index ddd9bf2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-
-public class FileData extends FileManager {
-
- /**
- * Store Path
- */
- private String storePath;
-
- /**
- * hierarchyValueWriter
- */
-
- public FileData(String fileName, String storePath) {
- this.fileName = fileName;
- this.storePath = storePath;
- }
-
- /**
- * @return Returns the carbonDataFileTempPath.
- */
- public String getFileName() {
- return fileName;
- }
-
- /**
- * @return Returns the storePath.
- */
- public String getStorePath() {
- return storePath;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
deleted file mode 100644
index cfa3a66..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/FileManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class FileManager implements IFileManagerComposite {
- /**
- * listOfFileData, composite parent which holds the different objects
- */
- protected List<IFileManagerComposite> listOfFileData =
- new ArrayList<IFileManagerComposite>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- protected String fileName;
-
- @Override public void add(IFileManagerComposite customData) {
- listOfFileData.add(customData);
- }
-
- @Override public void remove(IFileManagerComposite customData) {
- listOfFileData.remove(customData);
-
- }
-
- @Override public IFileManagerComposite get(int i) {
- return listOfFileData.get(i);
- }
-
- @Override public void setName(String name) {
- this.fileName = name;
- }
-
- /**
- * Return the size
- */
- public int size() {
- return listOfFileData.size();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java b/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
deleted file mode 100644
index 6691772..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/file/IFileManagerComposite.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.file;
-
-public interface IFileManagerComposite {
- /**
- * Add the data which can be either row Folder(Composite) or File
- *
- * @param customData
- */
- void add(IFileManagerComposite customData);
-
- /**
- * Remove the CustomData type object from the IFileManagerComposite object hierarchy.
- *
- * @param customData
- */
- void remove(IFileManagerComposite customData);
-
- /**
- * get the CustomData type object name
- *
- * @return CustomDataIntf type
- */
- IFileManagerComposite get(int i);
-
- /**
- * set the CustomData type object name
- *
- * @param name
- */
- void setName(String name);
-
- /**
- * Get the size
- *
- * @return
- */
- int size();
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 1b6ba72..855ec03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
-import org.apache.carbondata.processing.store.file.FileData;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IOUtils;
@@ -317,9 +316,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
.getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
"" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
- String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
- FileData fileData = new FileData(actualFileNameVal, chosenTempLocation);
- dataWriterVo.getFileManager().add(fileData);
this.carbonDataFileTempPath = chosenTempLocation + File.separator
+ carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
this.fileCount++;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 26fff09..79cdd95 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.file.IFileManagerComposite;
/**
* Value object for writing the data
@@ -35,12 +34,6 @@ public class CarbonDataWriterVo {
private String tableName;
- private IFileManagerComposite fileManager;
-
- private boolean[] rleEncodingForDictDim;
-
- private boolean[] isComplexType;
-
private int NoDictionaryCount;
private CarbonDataFileAttributes carbonDataFileAttributes;
@@ -49,8 +42,6 @@ public class CarbonDataWriterVo {
private List<ColumnSchema> wrapperColumnSchemaList;
- private boolean[] isDictionaryColumn;
-
private String carbonDataDirectoryPath;
private int[] colCardinality;
@@ -110,48 +101,6 @@ public class CarbonDataWriterVo {
}
/**
- * @return the fileManager
- */
- public IFileManagerComposite getFileManager() {
- return fileManager;
- }
-
- /**
- * @param fileManager the fileManager to set
- */
- public void setFileManager(IFileManagerComposite fileManager) {
- this.fileManager = fileManager;
- }
-
- /**
- * @return the rleEncodingForDictDim
- */
- public boolean[] getRleEncodingForDictDim() {
- return rleEncodingForDictDim;
- }
-
- /**
- * @param rleEncodingForDictDim the rleEncodingForDictDim to set
- */
- public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
- this.rleEncodingForDictDim = rleEncodingForDictDim;
- }
-
- /**
- * @return the isComplexType
- */
- public boolean[] getIsComplexType() {
- return isComplexType;
- }
-
- /**
- * @param isComplexType the isComplexType to set
- */
- public void setIsComplexType(boolean[] isComplexType) {
- this.isComplexType = isComplexType;
- }
-
- /**
* @return the noDictionaryCount
*/
public int getNoDictionaryCount() {
@@ -208,20 +157,6 @@ public class CarbonDataWriterVo {
}
/**
- * @return the isDictionaryColumn
- */
- public boolean[] getIsDictionaryColumn() {
- return isDictionaryColumn;
- }
-
- /**
- * @param isDictionaryColumn the isDictionaryColumn to set
- */
- public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
- this.isDictionaryColumn = isDictionaryColumn;
- }
-
- /**
* @return the carbonDataDirectoryPath
*/
public String getCarbonDataDirectoryPath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ca40830..7218a12 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -383,7 +383,7 @@ public final class CarbonDataProcessorUtil {
*/
public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- String factTableName = schema.getCarbonTable().getFactTableName();
+ String factTableName = schema.getCarbonTable().getTableName();
if (tableName.equals(factTableName)) {
List<CarbonDimension> dimensions =
schema.getCarbonTable().getDimensionByTableName(factTableName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29a979d..db3442e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,6 +263,11 @@ public final class CarbonLoaderUtil {
AbsoluteTableIdentifier absoluteTableIdentifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+ String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+ FileType fileType = FileFactory.getFileType(metadataPath);
+ if (!FileFactory.isFileExist(metadataPath, fileType)) {
+ FileFactory.mkdirs(metadataPath, fileType);
+ }
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 58cc019..e09e3db 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -297,9 +297,9 @@ public class StoreCreator {
String header = reader.readLine();
String[] split = header.split(",");
List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
- List<CarbonDimension> dims = table.getDimensionByTableName(table.getFactTableName());
+ List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
allCols.addAll(dims);
- List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getFactTableName());
+ List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
allCols.addAll(msrs);
Set<String>[] set = new HashSet[dims.size()];
for (int i = 0; i < set.length; i++) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 943858d..7682437 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -66,7 +66,7 @@ public class StreamSegment {
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table get or create segment");
LoadMetadataDetails[] details =
@@ -104,17 +104,17 @@ public class StreamSegment {
} else {
LOGGER.error(
"Not able to acquire the lock for stream table get or create segment for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
} else {
LOGGER.error(
"Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during stream table get or create segment");
+ .getTableName() + " during stream table get or create segment");
}
}
}
@@ -132,7 +132,7 @@ public class StreamSegment {
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+ " for stream table finish segment");
LoadMetadataDetails[] details =
@@ -165,17 +165,17 @@ public class StreamSegment {
} else {
LOGGER.error(
"Not able to acquire the lock for stream table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ + "." + table.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 31ed1f6..2c4d35f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -147,7 +147,7 @@ object StreamSinkFactory {
val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
optionsFinal.put("sort_scope", "no_sort")
if (parameters.get("fileheader").isEmpty) {
- optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+ optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala.map(_.getColName).mkString(","))
}
val carbonLoadModel = new CarbonLoadModel()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 6ee3296..c2789f4 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -42,14 +42,14 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
LockUsage.STREAMING_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
cache.put(event.id, lock)
} else {
LOGGER.error("Not able to acquire the lock for stream table:" +
- carbonTable.getDatabaseName + "." + carbonTable.getFactTableName)
+ carbonTable.getDatabaseName + "." + carbonTable.getTableName)
throw new InterruptedException(
"Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
- carbonTable.getFactTableName)
+ carbonTable.getTableName)
}
}
}