You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:34 UTC
[12/18] carbondata git commit: [CARBONDATA-1573] [Integration]
Support Database Location Configuration while Creating Database/ Support
Creation of carbon Table in the database location
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 24996ed..357a812 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -90,18 +90,17 @@ trait CarbonMetaStore {
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param tablePath
+ * @param absoluteTableIdentifier
* @param sparkSession
*/
def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)
+ absoluteTableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession): String
- def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: CarbonTableIdentifier,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- tablePath: String)(sparkSession: SparkSession): String
+ def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: AbsoluteTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo)(sparkSession: SparkSession): String
/**
* Prepare Thrift Schema from wrapper TableInfo and write to disk
*/
@@ -113,7 +112,7 @@ trait CarbonMetaStore {
* @return
*/
def generateTableSchemaString(tableInfo: schema.table.TableInfo,
- tablePath: String): String
+ absoluteTableIdentifier: AbsoluteTableIdentifier): String
/**
* This method will remove the table meta from catalog metadata array
@@ -128,12 +127,12 @@ trait CarbonMetaStore {
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
- def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tableIdentifier: AbsoluteTableIdentifier)
(sparkSession: SparkSession)
- def updateAndTouchSchemasUpdatedTime(basePath: String)
+ def updateAndTouchSchemasUpdatedTime()
- def checkSchemasModifiedTimeAndReloadTables(storePath: String)
+ def checkSchemasModifiedTimeAndReloadTables()
def isReadFromHiveMetaStore : Boolean
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 825f6ed..e587395 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonSessionCatalog(
var isRefreshed = false
val storePath = CarbonEnv.getInstance(sparkSession).storePath
carbonEnv.carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
+ checkSchemasModifiedTimeAndReloadTables()
val tableMeta = carbonEnv.carbonMetastore
.getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7d25efd..b76b24e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.hive.execution.command
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
@@ -38,15 +39,23 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
if (sparkSession.sessionState.catalog.listDatabases().exists(_.equalsIgnoreCase(dbName))) {
tablesInDB = sparkSession.sessionState.catalog.listTables(dbName)
}
+ var databaseLocation = ""
+ try {
+ databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
+ CarbonEnv.getInstance(sparkSession).storePath)
+ } catch {
+ case e: NoSuchDatabaseException =>
+ // ignore the exception as exception will be handled by hive command.run
+ databaseLocation = CarbonEnv.getInstance(sparkSession).storePath
+ }
// DropHiveDB command will fail if cascade is false and one or more table exists in database
- val rows = command.run(sparkSession)
if (command.cascade && tablesInDB != null) {
tablesInDB.foreach { tableName =>
CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
}
}
- CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
- CarbonEnv.getInstance(sparkSession).storePath)
+ CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, databaseLocation)
+ val rows = command.run(sparkSession)
rows
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bda4eeb..153b169 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
+import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -30,14 +31,13 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.format.TableInfo
object AlterTableUtil {
@@ -69,7 +69,7 @@ object AlterTableUtil {
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
- acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+ acquiredLocks += CarbonLockUtil.getLockObject(table.getAbsoluteTableIdentifier, lock)
}
acquiredLocks.toList
} catch {
@@ -102,15 +102,14 @@ object AlterTableUtil {
* @param locksAcquired
* @param dbName
* @param tableName
- * @param storeLocation
+ * @param tablePath
*/
def releaseLocksManually(locks: List[ICarbonLock],
locksAcquired: List[String],
dbName: String,
tableName: String,
- storeLocation: String): Unit = {
- val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
- dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
+ tablePath: String): Unit = {
+ val lockLocation = tablePath
locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
lockType
@@ -185,20 +184,23 @@ object AlterTableUtil {
*/
def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
newTableName: String,
- storePath: String,
+ tablePath: String,
tableId: String,
timeStamp: Long)
(sparkSession: SparkSession): Unit = {
val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val oldCarbonTableIdentifier = new CarbonTableIdentifier(database,
+ oldTableIdentifier.table, tableId)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newCarbonTableIdentifier)
+ val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newCarbonTableIdentifier)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val tableMetadataFile = carbonTablePath.getPath
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val tableInfo = if (metastore.isReadFromHiveMetaStore) {
// In case of hive metastore we first update the carbonschema inside old table only.
- metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(storePath,
+ metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
} else {
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -210,9 +212,10 @@ object AlterTableUtil {
FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
oldTableIdentifier.table)
- val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
- metastore.revertTableSchemaInAlterFailure(tableIdentifier,
- tableInfo, carbonTablePath.getPath)(sparkSession)
+ val absoluteTableIdentifier = new AbsoluteTableIdentifier(newTablePath,
+ newCarbonTableIdentifier)
+ metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier,
+ tableInfo, absoluteTableIdentifier)(sparkSession)
metastore.removeTableFromMetadata(database, newTableName)
}
}
@@ -233,7 +236,7 @@ object AlterTableUtil {
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -244,7 +247,7 @@ object AlterTableUtil {
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
metastore
.revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
}
}
@@ -262,7 +265,7 @@ object AlterTableUtil {
val carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -279,7 +282,7 @@ object AlterTableUtil {
}
metastore
.revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
}
}
@@ -297,7 +300,7 @@ object AlterTableUtil {
val carbonTable = metastore
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
@@ -317,7 +320,7 @@ object AlterTableUtil {
}
metastore
.revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
}
}
@@ -351,7 +354,7 @@ object AlterTableUtil {
.tableMeta.carbonTable
// get the latest carbon table
// read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
carbonTable.getCarbonTableIdentifier)
val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
@@ -359,7 +362,7 @@ object AlterTableUtil {
.fromExternalToWrapperTableInfo(thriftTableInfo,
dbName,
tableName,
- carbonTable.getStorePath)
+ carbonTable.getTablePath)
val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(timeStamp)
val thriftTable = schemaConverter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 421cd2e..dcfbaea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -61,7 +61,7 @@ object CleanFiles {
}
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+ checkSchemasModifiedTimeAndReloadTables()
cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 91121ce..709f474 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -58,7 +58,7 @@ object Compaction {
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+ checkSchemasModifiedTimeAndReloadTables()
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 4aaec8f..8375762 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -48,7 +48,7 @@ object DeleteSegmentByDate {
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+ checkSchemasModifiedTimeAndReloadTables()
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index c86c7f5..9b87504 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -53,7 +53,7 @@ object DeleteSegmentById {
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+ checkSchemasModifiedTimeAndReloadTables()
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 501402b..13883ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -81,7 +81,7 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
+ checkSchemasModifiedTimeAndReloadTables()
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 04de9a3..23cba20 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -843,8 +843,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
}
def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
- val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName,
- carbonTable.getFactTableName)
+ val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getTablePath)
val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 5c7d451..a3024be 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -64,7 +64,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
// Create table and metadata folders if not exist
val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+ .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index fd3b2cd..930de43 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -41,11 +41,12 @@ object DictionaryTestCaseUtil {
val table = relation.tableMeta.carbonTable
val dimension = table.getDimensionByName(table.getFactTableName, columnName)
val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid")
- val columnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier,
+ val absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
+ val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
dimension.getColumnIdentifier, dimension.getDataType,
CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
)
- val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, TestQueryExecutor.storeLocation)
+ val dict = CarbonLoaderUtil.getDictionary(columnIdentifier)
assert(dict.getSurrogateKey(value) != CarbonCommonConstants.INVALID_SURROGATE_KEY)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 399665f..d37a68b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -177,7 +177,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
carbonLoadModel.setMaxColumns("100")
// Create table and metadata folders if not exist
val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier)
+ .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataDirectoryPath)
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index deb6287..3fb1424 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -196,7 +196,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
thread1.start()
// use thread pool to catch the exception of sink thread
val pool = Executors.newSingleThreadExecutor()
- val thread2 = createSocketStreamingThread(spark, tablePath)
+ val thread2 = createSocketStreamingThread(spark, tablePath, identifier)
val future = pool.submit(thread2)
Thread.sleep(1000)
thread1.interrupt()
@@ -242,7 +242,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
val csvDataDir = new File("target/csvdata").getCanonicalPath
// streaming ingest 10 rows
generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
- val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1)
+ val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
+ identifier )
thread.start()
Thread.sleep(2000)
generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
@@ -636,6 +637,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
def createSocketStreamingThread(
spark: SparkSession,
tablePath: CarbonTablePath,
+ tableIdentifier: TableIdentifier,
badRecordAction: String = "force",
intervalSecond: Int = 2,
handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT): Thread = {
@@ -656,6 +658,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("tablePath", tablePath.getPath)
.option("bad_records_action", badRecordAction)
+ .option("dbName", tableIdentifier.database.get)
+ .option("tableName", tableIdentifier.table)
.option(CarbonStreamOutputFormat.HANDOFF_SIZE, handoffSize)
.start()
qry.awaitTermination()
@@ -698,7 +702,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecords = generateBadRecords)
val thread2 = createSocketStreamingThread(
spark = spark,
- tablePath = tablePath,
+ tablePath = tablePath, identifier,
badRecordAction = badRecordAction,
intervalSecond = intervalOfIngest,
handoffSize = handoffSize)
@@ -740,7 +744,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
spark: SparkSession,
tablePath: CarbonTablePath,
csvDataDir: String,
- intervalSecond: Int): Thread = {
+ intervalSecond: Int,
+ tableIdentifier: TableIdentifier): Thread = {
new Thread() {
override def run(): Unit = {
val inputSchema = new StructType()
@@ -765,6 +770,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.trigger(ProcessingTime(s"${ intervalSecond } seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("tablePath", tablePath.getPath)
+ .option("dbName", tableIdentifier.database.get)
+ .option("tableName", tableIdentifier.table)
.start()
qry.awaitTermination()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index e574d7f..3b6b85d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -437,7 +437,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
test("test to check if the lock file is successfully deleted") {
sql("create table lock_check(id int, name string) stored by 'carbondata'")
sql("alter table lock_check rename to lock_rename")
- assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /default/lock_rename/meta.lock")
+ assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /lock_rename/meta.lock")
.exists())
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index adf4574..9f89226 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -26,7 +26,8 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
@@ -115,7 +116,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
createAndLoadTestTable(table, "csv_table")
DeleteSegmentById.main(Array(s"${location}", table, "0"))
CleanFiles.main(Array(s"${location}", table))
- val tablePath = s"${location}${File.separator}default${File.separator}$table"
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_"+table)
+ val tablePath = carbonTable.getAbsoluteTableIdentifier.getTablePath
val f = new File(s"$tablePath/Fact/Part0")
assert(f.isDirectory)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 95d7d2e..f70e38e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -122,16 +122,16 @@ public class PrimitiveDataType implements GenericDataType<Object> {
*/
public PrimitiveDataType(String name, String parentname, String columnId,
CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
- String storePath, Map<Object, Integer> localCache) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ Map<Object, Integer> localCache) {
this.name = name;
this.parentname = parentname;
this.columnId = columnId;
this.carbonDimension = carbonDimension;
DictionaryColumnUniqueIdentifier identifier =
- new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
carbonDimension.getColumnIdentifier(), carbonDimension.getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
try {
if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
@@ -139,13 +139,14 @@ public class PrimitiveDataType implements GenericDataType<Object> {
} else {
Dictionary dictionary = null;
if (useOnePass) {
- if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+ if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
dictionary = cache.get(identifier);
}
DictionaryMessage dictionaryMessage = new DictionaryMessage();
dictionaryMessage.setColumnName(carbonDimension.getColName());
// for table initialization
- dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+ dictionaryMessage
+ .setTableUniqueId(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
dictionaryMessage.setData("0");
// for generate dictionary
dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 05104a2..442d93e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -142,7 +142,7 @@ public final class DataLoadProcessBuilder {
CarbonProperties.getInstance().addProperty(tempLocationKey,
StringUtils.join(storeLocation, File.pathSeparator));
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+ .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getTablePath());
return createConfiguration(loadModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
index 7045101..4ac8850 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -65,8 +65,8 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
public DictionaryFieldConverterImpl(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
- DictionaryClient client, boolean useOnePass, String storePath,
+ AbsoluteTableIdentifier absoluteTableIdentifier, String nullFormat, int index,
+ DictionaryClient client, boolean useOnePass,
Map<Object, Integer> localCache, boolean isEmptyBadRecord,
DictionaryColumnUniqueIdentifier identifier) throws IOException {
this.index = index;
@@ -76,13 +76,14 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
// if use one pass, use DictionaryServerClientDictionary
if (useOnePass) {
- if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+ if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
dictionary = cache.get(identifier);
}
dictionaryMessage = new DictionaryMessage();
dictionaryMessage.setColumnName(dataField.getColumn().getColName());
// for table initialization
- dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+ dictionaryMessage
+ .setTableUniqueId(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
dictionaryMessage.setData("0");
// for generate dictionary
dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 22d15d9..778e1b3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -33,7 +34,9 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -61,15 +64,15 @@ public class FieldEncoderFactory {
*
* @param dataField column schema
* @param cache dicionary cache.
- * @param carbonTableIdentifier table identifier
+ * @param absoluteTableIdentifier table identifier
* @param index index of column in the row.
* @param isEmptyBadRecord
* @return
*/
public FieldConverter createFieldEncoder(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
- DictionaryClient client, Boolean useOnePass, String storePath,
+ AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat,
+ DictionaryClient client, Boolean useOnePass,
Map<Object, Integer> localCache, boolean isEmptyBadRecord)
throws IOException {
// Converters are only needed for dimensions and measures it return null.
@@ -85,11 +88,11 @@ public class FieldEncoderFactory {
// in case of child table it will use parent table dictionary
if (null == dataField.getColumn().getColumnSchema().getParentColumnTableRelations()
|| dataField.getColumn().getColumnSchema().getParentColumnTableRelations().isEmpty()) {
- identifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+ identifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
- return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier,
- nullFormat, index, client, useOnePass, storePath, localCache, isEmptyBadRecord,
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
+ return new DictionaryFieldConverterImpl(dataField, cache, absoluteTableIdentifier,
+ nullFormat, index, client, useOnePass, localCache, isEmptyBadRecord,
identifier);
} else {
ParentColumnTableRelation parentColumnTableRelation =
@@ -103,17 +106,21 @@ public class FieldEncoderFactory {
ColumnIdentifier parentColumnIdentifier =
new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
dataField.getColumn().getDataType());
- identifier =
- new DictionaryColumnUniqueIdentifier(parentTableIdentifier, parentColumnIdentifier,
- dataField.getColumn().getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, parentTableIdentifier));
- return new DictionaryFieldConverterImpl(dataField, cache, parentTableIdentifier,
- nullFormat, index, null, false, storePath, null, isEmptyBadRecord, identifier);
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+ AbsoluteTableIdentifier parentAbsoluteTableIdentifier = new AbsoluteTableIdentifier(
+ CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier),
+ parentTableIdentifier);
+ identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
+ parentColumnIdentifier, dataField.getColumn().getDataType(),
+ CarbonStorePath.getCarbonTablePath(parentAbsoluteTableIdentifier));
+ return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,
+ nullFormat, index, null, false, null, isEmptyBadRecord, identifier);
}
} else if (dataField.getColumn().isComplex()) {
return new ComplexFieldConverterImpl(
- createComplexType(dataField, cache, carbonTableIdentifier,
- client, useOnePass, storePath, localCache), index);
+ createComplexType(dataField, cache, absoluteTableIdentifier,
+ client, useOnePass, localCache), index);
} else {
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
@@ -127,10 +134,10 @@ public class FieldEncoderFactory {
*/
private static GenericDataType createComplexType(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
- String storePath, Map<Object, Integer> localCache) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ Map<Object, Integer> localCache) {
return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath, localCache);
+ absoluteTableIdentifier, client, useOnePass, localCache);
}
/**
@@ -140,8 +147,8 @@ public class FieldEncoderFactory {
*/
private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
- String storePath, Map<Object, Integer> localCache) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ Map<Object, Integer> localCache) {
DataType dataType = carbonColumn.getDataType();
if (DataTypes.isArrayType(dataType)) {
List<CarbonDimension> listOfChildDimensions =
@@ -151,8 +158,8 @@ public class FieldEncoderFactory {
new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
for (CarbonDimension dimension : listOfChildDimensions) {
arrayDataType.addChildren(
- createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
- client, useOnePass, storePath, localCache));
+ createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
+ client, useOnePass, localCache));
}
return arrayDataType;
} else if (DataTypes.isStructType(dataType)) {
@@ -163,16 +170,16 @@ public class FieldEncoderFactory {
new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
for (CarbonDimension dimension : dimensions) {
structDataType.addChildren(
- createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
- client, useOnePass, storePath, localCache));
+ createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
+ client, useOnePass, localCache));
}
return structDataType;
} else if (DataTypes.isMapType(dataType)) {
throw new UnsupportedOperationException("Complex type Map is not supported yet");
} else {
return new PrimitiveDataType(carbonColumn.getColName(), parentName,
- carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache, carbonTableIdentifier,
- client, useOnePass, storePath, localCache);
+ carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+ absoluteTableIdentifier, client, useOnePass, localCache);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 79c6d61..16c4a22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -85,8 +85,7 @@ public class RowConverterImpl implements RowConverter {
@Override
public void initialize() throws IOException {
CacheProvider cacheProvider = CacheProvider.getInstance();
- cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
- configuration.getTableIdentifier().getStorePath());
+ cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY);
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
@@ -102,10 +101,8 @@ public class RowConverterImpl implements RowConverter {
for (int i = 0; i < fields.length; i++) {
localCaches[i] = new ConcurrentHashMap<>();
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
- .createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
- configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
- localCaches[i], isEmptyBadRecord);
+ .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat,
+ client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
fieldConverterList.add(fieldConverter);
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -210,10 +207,9 @@ public class RowConverterImpl implements RowConverter {
for (int i = 0; i < fields.length; i++) {
FieldConverter fieldConverter = null;
try {
- fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
- configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
- localCaches[i], isEmptyBadRecord);
+ fieldConverter = FieldEncoderFactory.getInstance()
+ .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat,
+ client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index acd7fed..8c3fe56 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -43,7 +43,7 @@ public class CarbonLoadModel implements Serializable {
private boolean aggLoadRequest;
- private String storePath;
+ private String tablePath;
private boolean isRetentionRequest;
@@ -354,7 +354,7 @@ public class CarbonLoadModel implements Serializable {
copy.dateFormat = dateFormat;
copy.defaultTimestampFormat = defaultTimestampFormat;
copy.maxColumns = maxColumns;
- copy.storePath = storePath;
+ copy.tablePath = tablePath;
copy.useOnePass = useOnePass;
copy.dictionaryServerHost = dictionaryServerHost;
copy.dictionaryServerPort = dictionaryServerPort;
@@ -402,7 +402,7 @@ public class CarbonLoadModel implements Serializable {
copy.dateFormat = dateFormat;
copy.defaultTimestampFormat = defaultTimestampFormat;
copy.maxColumns = maxColumns;
- copy.storePath = storePath;
+ copy.tablePath = tablePath;
copy.useOnePass = useOnePass;
copy.dictionaryServerHost = dictionaryServerHost;
copy.dictionaryServerPort = dictionaryServerPort;
@@ -452,7 +452,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.dateFormat = dateFormat;
copyObj.defaultTimestampFormat = defaultTimestampFormat;
copyObj.maxColumns = maxColumns;
- copyObj.storePath = storePath;
+ copyObj.tablePath = tablePath;
copyObj.useOnePass = useOnePass;
copyObj.dictionaryServerHost = dictionaryServerHost;
copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -480,17 +480,17 @@ public class CarbonLoadModel implements Serializable {
}
/**
- * @param storePath The storePath to set.
+ * @param tablePath The tablePath to set.
*/
- public void setStorePath(String storePath) {
- this.storePath = storePath;
+ public void setTablePath(String tablePath) {
+ this.tablePath = tablePath;
}
/**
* @return Returns the factStoreLocation.
*/
- public String getStorePath() {
- return storePath;
+ public String getTablePath() {
+ return tablePath;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index aa77fb6..6da50a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -47,7 +47,7 @@ public abstract class AbstractResultProcessor {
CarbonDataFileAttributes carbonDataFileAttributes;
if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
- CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
+ CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
carbonTable.getCarbonTableIdentifier()));
// Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
// be written in same segment. So the TaskNo should be incremented by 1 from max val.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c2ee1bc..c1df349 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -171,9 +171,7 @@ public final class CarbonDataMergerUtil {
AbsoluteTableIdentifier absoluteTableIdentifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
SegmentUpdateStatusManager segmentUpdateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -298,8 +296,7 @@ public final class CarbonDataMergerUtil {
+ carbonLoadModel.getTableName() + " for table status updation ");
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier);
String statusFilePath = carbonTablePath.getTableStatusFilePath();
@@ -388,7 +385,7 @@ public final class CarbonDataMergerUtil {
public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
CarbonLoadModel carbonLoadModel, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {
- String storeLocation = carbonLoadModel.getStorePath();
+ String tablePath = carbonLoadModel.getTablePath();
List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
sortSegments(sortedSegments);
@@ -413,7 +410,7 @@ public final class CarbonDataMergerUtil {
if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
- listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
+ listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
} else {
listOfSegmentsToBeMerged =
@@ -580,7 +577,7 @@ public final class CarbonDataMergerUtil {
*/
private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
- CarbonLoadModel carbonLoadModel, String storeLocation) {
+ CarbonLoadModel carbonLoadModel, String tablePath) {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -603,7 +600,7 @@ public final class CarbonDataMergerUtil {
String segId = segment.getLoadName();
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition =
- getSizeOfSegment(storeLocation, tableIdentifier, segId);
+ getSizeOfSegment(tablePath, tableIdentifier, segId);
// if size of a segment is greater than the Major compaction size. then ignore it.
if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -646,9 +643,9 @@ public final class CarbonDataMergerUtil {
* @param segId segment id
* @return the data size of the segment
*/
- private static long getSizeOfSegment(String storePath,
+ private static long getSizeOfSegment(String tablePath,
CarbonTableIdentifier tableIdentifier, String segId) {
- String loadPath = getStoreLocation(storePath, tableIdentifier, segId);
+ String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
CarbonFile segmentFolder =
FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
return getSizeOfFactFileInLoad(segmentFolder);
@@ -657,15 +654,15 @@ public final class CarbonDataMergerUtil {
/**
* This method will get the store location for the given path, segemnt id and partition id
*
- * @param storePath the store path of the segment
+ * @param tablePath
* @param carbonTableIdentifier identifier of catbon table that the segment belong to
* @param segmentId segment id
* @return the store location of the segment
*/
- private static String getStoreLocation(String storePath,
+ private static String getStoreLocation(String tablePath,
CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+ CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
}
@@ -1001,9 +998,7 @@ public final class CarbonDataMergerUtil {
CarbonFile[] updateDeltaFiles = null;
Set<String> uniqueBlocks = new HashSet<String>();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
CarbonFile segDir =
@@ -1255,8 +1250,7 @@ public final class CarbonDataMergerUtil {
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier);
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index b6ac19d..4f9458c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -306,7 +306,7 @@ public class CarbonFactDataHandlerModel {
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
+ .checkAndCreateCarbonStoreLocation(loadModel.getTablePath(), loadModel.getDatabaseName(),
tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
@@ -331,17 +331,13 @@ public class CarbonFactDataHandlerModel {
* @return data directory path
*/
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
- String carbonStorePath =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS);
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- tableIdentifier.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + tableIdentifier
- .getTableName());
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTable.getCarbonTableIdentifier());
- return carbonTablePath.getCarbonDataDirectoryPath(configuration.getPartitionId(),
- configuration.getSegmentId() + "");
+ AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+ String carbonDataDirectoryPath = carbonTablePath
+ .getCarbonDataDirectoryPath(configuration.getPartitionId(),
+ configuration.getSegmentId() + "");
+ CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ return carbonDataDirectoryPath;
}
public int[] getColCardinality() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 5b6998a..29a979d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -89,7 +89,7 @@ public final class CarbonLoaderUtil {
public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+ .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
@@ -109,7 +109,7 @@ public final class CarbonLoaderUtil {
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
.getCarbonTable();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
- loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+ loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
int fileCount = 0;
int partitionCount = carbonTable.getPartitionCount();
@@ -145,7 +145,7 @@ public final class CarbonLoaderUtil {
String metaDataLocation = carbonTable.getMetaDataFilepath();
final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+ .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
//delete folder which metadata no exist in tablestatus
for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
@@ -262,9 +262,7 @@ public final class CarbonLoaderUtil {
loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
AbsoluteTableIdentifier absoluteTableIdentifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -382,10 +380,10 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
- public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+ public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
DataOutputStream dataOutputStream;
@@ -427,20 +425,19 @@ public final class CarbonLoaderUtil {
return date;
}
- public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
- String carbonStorePath) throws IOException {
+ public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
+ throws IOException {
Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
- CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+ CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY);
return dictCache.get(columnIdentifier);
}
- public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
- ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+ public static Dictionary getDictionary(AbsoluteTableIdentifier absoluteTableIdentifier,
+ ColumnIdentifier columnIdentifier, DataType dataType)
throws IOException {
return getDictionary(
- new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType,
- CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)),
- carbonStorePath);
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier, dataType,
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index ffdabce..09e7b47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -43,17 +43,14 @@ public final class DeleteLoadFolders {
/**
* returns segment path
*
- * @param dbName
- * @param tableName
- * @param storeLocation
+ * @param absoluteTableIdentifier
* @param partitionId
* @param oneLoad
* @return
*/
- private static String getSegmentPath(String dbName, String tableName, String storeLocation,
+ private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
int partitionId, LoadMetadataDetails oneLoad) {
- CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
- new CarbonTableIdentifier(dbName, tableName, ""));
+ CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String segmentId = oneLoad.getLoadName();
return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
}
@@ -125,15 +122,15 @@ public final class DeleteLoadFolders {
return false;
}
- public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
- String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
-
+ public static boolean deleteLoadFoldersFromFileSystem(
+ AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
+ LoadMetadataDetails[] details) {
boolean isDeleted = false;
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
+ String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
if (deletionStatus) {
isDeleted = true;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index d15b45c..485e718 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -63,7 +63,7 @@ public class BlockIndexStoreTest extends TestCase {
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");
CacheProvider cacheProvider = CacheProvider.getInstance();
- cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE, "");
+ cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE);
}
@AfterClass public void tearDown() {
@@ -258,9 +258,7 @@ public class BlockIndexStoreTest extends TestCase {
}
private static File getPartFile() {
- String path = StoreCreator.getAbsoluteTableIdentifier().getStorePath() + "/" + StoreCreator
- .getAbsoluteTableIdentifier().getCarbonTableIdentifier().getDatabaseName() + "/"
- + StoreCreator.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableName()
+ String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+ "/Fact/Part0/Segment_0";
File file = new File(path);
File[] files = file.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
index ddc8657..ba77f29 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.lcm.locks;
import java.io.File;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.locks.LocalFileLock;
import org.apache.carbondata.core.locks.LockUsage;
@@ -51,13 +52,15 @@ public class LocalFileLockTest {
@Test public void testingLocalFileLockingByAcquiring2Locks() {
- CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("databaseName", "tableName", "tableId");
+ AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+ .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName",
+ "tableName");
LocalFileLock localLock1 =
- new LocalFileLock(carbonTableIdentifier,
+ new LocalFileLock(absoluteTableIdentifier,
LockUsage.METADATA_LOCK);
Assert.assertTrue(localLock1.lock());
LocalFileLock localLock2 =
- new LocalFileLock(carbonTableIdentifier,
+ new LocalFileLock(absoluteTableIdentifier,
LockUsage.METADATA_LOCK);
Assert.assertTrue(!localLock2.lock());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
index 757f2e1..e0b6d66 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
@@ -17,6 +17,9 @@
package org.apache.carbondata.lcm.locks;
import mockit.NonStrictExpectations;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.locks.LockUsage;
@@ -95,7 +98,9 @@ public class ZooKeeperLockingTest {
ZookeeperInit zki = ZookeeperInit.getInstance("127.0.0.1:" + freePort);
- CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("dbName", "tableName", "tableId");
+ AbsoluteTableIdentifier tableIdentifier = AbsoluteTableIdentifier
+ .from(CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
+ "dbName", "tableName");
ZooKeeperLocking zkl =
new ZooKeeperLocking(tableIdentifier,
LockUsage.METADATA_LOCK);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 0f919ab..58cc019 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -99,14 +99,15 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
public class StoreCreator {
private static AbsoluteTableIdentifier absoluteTableIdentifier;
-
+ private static String storePath = "";
static {
try {
- String storePath = new File("target/store").getCanonicalPath();
+ storePath = new File("target/store").getCanonicalPath();
String dbName = "testdb";
String tableName = "testtable";
absoluteTableIdentifier =
- new AbsoluteTableIdentifier(storePath, new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+ new AbsoluteTableIdentifier(storePath + "/testdb/testtable",
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
} catch (IOException ex) {
}
@@ -122,10 +123,10 @@ public class StoreCreator {
public static void createCarbonStore() {
try {
String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
- File storeDir = new File(absoluteTableIdentifier.getStorePath());
+ File storeDir = new File(storePath);
CarbonUtil.deleteFoldersAndFiles(storeDir);
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
- absoluteTableIdentifier.getStorePath());
+ storePath);
CarbonTable table = createTable();
writeDictionary(factFilePath, table);
@@ -137,7 +138,7 @@ public class StoreCreator {
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+ loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -165,7 +166,7 @@ public class StoreCreator {
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
- loadData(loadModel, absoluteTableIdentifier.getStorePath());
+ loadData(loadModel, storePath);
} catch (Exception e) {
e.printStackTrace();
@@ -174,7 +175,7 @@ public class StoreCreator {
private static CarbonTable createTable() throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -263,7 +264,7 @@ public class StoreCreator {
tableInfo.setFactTable(tableSchema);
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
String schemaFilePath = carbonTablePath.getSchemaFilePath();
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
@@ -314,21 +315,24 @@ public class StoreCreator {
}
Cache dictCache = CacheProvider.getInstance()
- .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ .createCache(CacheType.REVERSE_DICTIONARY);
for (int i = 0; i < set.length; i++) {
- ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
- DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier, columnIdentifier.getDataType(),
- CarbonStorePath.getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier()));
+ ColumnIdentifier columnIdentifier =
+ new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
+ DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+ new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
+ columnIdentifier.getDataType(), CarbonStorePath
+ .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+ table.getCarbonTableIdentifier()));
CarbonDictionaryWriter writer =
- new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier);
+ new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
for (String value : set[i]) {
writer.write(value);
}
writer.close();
writer.commit();
Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
columnIdentifier, dims.get(i).getDataType(),
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
CarbonDictionarySortInfoPreparator preparator =
@@ -337,9 +341,7 @@ public class StoreCreator {
CarbonDictionarySortInfo dictionarySortInfo =
preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
CarbonDictionarySortIndexWriter carbonDictionaryWriter =
- new CarbonDictionarySortIndexWriterImpl(
- absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier,
- absoluteTableIdentifier.getStorePath());
+ new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
try {
carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
@@ -405,7 +407,7 @@ public class StoreCreator {
format.createRecordReader(blockDetails, hadoopAttemptContext);
CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
- String[] storeLocationArray = new String[] {storeLocation};
+ String[] storeLocationArray = new String[] {storeLocation + "/" + databaseName + "/" + tableName};
new DataLoadExecutor().execute(loadModel,
storeLocationArray,
new CarbonIterator[]{readerIterator});
@@ -483,7 +485,7 @@ public class StoreCreator {
}
public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+ SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
String date = null;
date = sdf.format(new Date());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 80936d1..6ee3296 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -38,7 +38,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
LOGGER.info("Carbon streaming query started: " + event.id)
val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
val carbonTable = sink.carbonTable
- val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getCarbonTableIdentifier,
+ val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.STREAMING_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +