You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/20 02:49:23 UTC
[1/2] carbondata git commit: Changed default store location to
spark-warehouse [Forced Update!]
Repository: carbondata
Updated Branches:
refs/heads/metadata 39d7a237f -> 34d2870ae (forced update)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index ab27b4f..bfe1276 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -50,17 +50,6 @@ trait CarbonMetaStore {
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation
- /**
- * Get table meta
- * TODO remove it if possible
- * @param database
- * @param tableName
- * @param readStore
- * @return
- */
- def getTableFromMetadata(database: String,
- tableName: String,
- readStore: Boolean = false): Option[TableMeta]
def tableExists(
table: String,
@@ -68,8 +57,6 @@ trait CarbonMetaStore {
def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
- def loadMetadata(metadataPath: String, queryId: String): MetaData
-
/**
* This method will overwrite the existing schema and update it with the given details
*
@@ -90,23 +77,26 @@ trait CarbonMetaStore {
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- carbonStorePath: String)
+ tablePath: String)
(sparkSession: SparkSession): String
/**
- *
- * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
- * Load CarbonTable from wrapper tableInfo
- *
+ * Prepare Thrift Schema from wrapper TableInfo and write to disk
*/
- def createTableFromThrift(tableInfo: table.TableInfo,
- dbName: String,
- tableName: String)(sparkSession: SparkSession): (String, String)
+ def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String)
+
+ /**
+ * Generates schema string to save it in hive metastore
+ * @param tableInfo
+ * @return
+ */
+ def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+ tablePath: String): String
/**
* This method will remove the table meta from catalog metadata array
@@ -117,25 +107,25 @@ trait CarbonMetaStore {
def removeTableFromMetadata(dbName: String, tableName: String): Unit
def updateMetadataByThriftTable(schemaFilePath: String,
- tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit
+ tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession)
- def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String)
+ def updateAndTouchSchemasUpdatedTime(basePath: String)
- def checkSchemasModifiedTimeAndReloadTables()
+ def checkSchemasModifiedTimeAndReloadTables(storePath: String)
def isReadFromHiveMetaStore : Boolean
def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
- def storePath: String
-
def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
+ def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta]
+
}
/**
@@ -143,12 +133,12 @@ trait CarbonMetaStore {
*/
object CarbonMetaStoreFactory {
- def createCarbonMetaStore(conf: RuntimeConfig, storePath: String): CarbonMetaStore = {
+ def createCarbonMetaStore(conf: RuntimeConfig): CarbonMetaStore = {
val readSchemaFromHiveMetaStore = readSchemaFromHive(conf)
if (readSchemaFromHiveMetaStore) {
- new CarbonHiveMetaStore(conf, storePath)
+ new CarbonHiveMetaStore(conf)
} else {
- new CarbonFileMetastore(conf, storePath)
+ new CarbonFileMetastore(conf)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 4aef118..99a935b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -94,16 +94,17 @@ class CarbonSessionCatalog(
private def refreshRelationFromCache(name: TableIdentifier,
alias: Option[String],
carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): LogicalPlan = {
- carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables
+ carbonEnv.carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(sparkSession).storePath)
carbonEnv.carbonMetastore
- .getTableFromMetadata(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
carbonDatasourceHadoopRelation.carbonTable.getFactTableName) match {
case tableMeta: TableMeta =>
if (tableMeta.carbonTable.getTableLastUpdatedTime !=
carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
refreshTable(name)
}
- case _ => refreshTable(name)
+ case _ =>
}
super.lookupRelation(name, alias)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index e94b6ed..f764882 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -52,7 +52,7 @@ object Spark2TestQueryExecutor {
.appName("Spark2TestQueryExecutor")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", TestQueryExecutor.warehouse)
- .getOrCreateCarbonSession(TestQueryExecutor.storeLocation, TestQueryExecutor.metastoredb)
+ .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
spark.sparkContext.setLogLevel("ERROR")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 01bdc4f..74f4dd0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -136,8 +136,9 @@ object AlterTableUtil {
carbonTable.getCarbonTableIdentifier,
thriftTable,
schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
+ carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).schema.json
val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
@@ -206,7 +207,8 @@ object AlterTableUtil {
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
oldTableIdentifier.table)
val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
- metastore.revertTableSchema(tableIdentifier, tableInfo, storePath)(sparkSession)
+ metastore.revertTableSchema(tableIdentifier,
+ tableInfo, carbonTablePath.getPath)(sparkSession)
metastore.removeTableFromMetadata(database, newTableName)
}
}
@@ -238,7 +240,7 @@ object AlterTableUtil {
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
@@ -273,7 +275,7 @@ object AlterTableUtil {
}
metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
@@ -311,7 +313,7 @@ object AlterTableUtil {
}
metastore
.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 645081f..df73641 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -47,7 +47,8 @@ object CleanFiles {
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
cleanFiles(spark, dbName, tableName, storePath)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index d78fd5f..d00cb84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -56,7 +56,8 @@ object Compaction {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index f67a5ce..4aaec8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -47,7 +47,8 @@ object DeleteSegmentByDate {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index bbf386e..c86c7f5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -52,7 +52,8 @@ object DeleteSegmentById {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index d5788ba..19d7dce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -78,7 +78,8 @@ object ShowSegments {
None
}
val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
val rows = showSegments(spark, dbName, tableName, limit)
System.out.println(showString(rows))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 82d8da2..8adaf00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -80,7 +80,8 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
- CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ CarbonEnv.getInstance(spark).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(spark).storePath)
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 25be4a0..fc67cdf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
@@ -42,32 +43,34 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
dropTable("carbon_table")
}
+ private lazy val location =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
test("show segment") {
- ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table"))
+ ShowSegments.main(Array(s"${location}", "carbon_table"))
}
test("delete segment by id") {
- DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table", "0"))
- assert(!CarbonStore.isSegmentValid("default", "carbon_table", "0"))
+ DeleteSegmentById.main(Array(s"${location}", "carbon_table", "0"))
+ assert(!CarbonStore.isSegmentValid("default", "carbon_table",location, "0"))
}
test("delete segment by date") {
createAndLoadTestTable("carbon_table2", "csv_table")
val time = new Timestamp(new Date().getTime)
- DeleteSegmentByDate.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table2", time.toString))
- assert(!CarbonStore.isSegmentValid("default", "carbon_table2", "0"))
+ DeleteSegmentByDate.main(Array(s"${location}", "carbon_table2", time.toString))
+ assert(!CarbonStore.isSegmentValid("default", "carbon_table2", location, "0"))
dropTable("carbon_table2")
}
test("clean files") {
val table = "carbon_table3"
createAndLoadTestTable(table, "csv_table")
- ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
- DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", table, "0"))
- ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
- CleanFiles.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
- ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
- val tablePath = s"${CarbonUtil.getCarbonStorePath}${File.separator}default${File.separator}$table"
+ ShowSegments.main(Array(s"${location}", table))
+ DeleteSegmentById.main(Array(s"${location}", table, "0"))
+ ShowSegments.main(Array(s"${location}", table))
+ CleanFiles.main(Array(s"${location}", table))
+ ShowSegments.main(Array(s"${location}", table))
+ val tablePath = s"${location}${File.separator}default${File.separator}$table"
val f = new File(s"$tablePath/Fact/Part0")
assert(f.isDirectory)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
index b84d695..09dbfff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/TableMeta.java
@@ -29,11 +29,13 @@ public class TableMeta implements Serializable {
public CarbonTableIdentifier carbonTableIdentifier;
public String storePath;
public CarbonTable carbonTable;
+ public String tablePath;
- public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
+ public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, String tablePath,
CarbonTable carbonTable) {
this.carbonTableIdentifier = carbonTableIdentifier;
this.storePath = storePath;
+ this.tablePath = tablePath;
this.carbonTable = carbonTable;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 62f13db..17f4df6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -173,8 +173,10 @@ public final class CarbonDataProcessorUtil {
String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
String tempLocationKey =
getTempStoreLocationKey(databaseName, tableName, taskId, isCompactionFlow);
- String baseStorePath = CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+ String baseStorePath = CarbonProperties.getInstance().getProperty(tempLocationKey);
+ if (baseStorePath == null) {
+ LOGGER.warn("Location not set for the key " + tempLocationKey);
+ }
CarbonTable carbonTable = CarbonMetadata.getInstance()
.getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
CarbonTablePath carbonTablePath =
[2/2] carbondata git commit: Changed default store location to
spark-warehouse
Posted by ja...@apache.org.
Changed default store location to spark-warehouse
refactored code to use common code
Fixed style
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34d2870a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34d2870a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34d2870a
Branch: refs/heads/metadata
Commit: 34d2870aebd10adfacd56e5892a9b1149fb6a08e
Parents: 8d3c9bf
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Fri Jul 14 11:12:57 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jul 20 10:45:22 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 -
.../core/metadata/AbsoluteTableIdentifier.java | 5 +-
.../carbondata/core/util/CarbonProperties.java | 4 -
.../apache/carbondata/core/util/CarbonUtil.java | 28 +-
.../spark/sql/common/util/QueryTest.scala | 6 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 6 +-
.../org/apache/carbondata/api/CarbonStore.scala | 3 +-
.../spark/sql/test/TestQueryExecutor.scala | 1 -
.../spark/thriftserver/CarbonThriftServer.scala | 4 +-
.../org/apache/spark/sql/CarbonContext.scala | 12 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 3 +-
.../apache/spark/sql/hive/CarbonMetastore.scala | 4 +-
.../spark/thriftserver/CarbonThriftServer.scala | 4 +-
.../carbondata/spark/util/CarbonSparkUtil.scala | 3 +-
.../spark/sql/CarbonDataFrameWriter.scala | 2 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 4 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 12 +-
.../org/apache/spark/sql/CarbonSession.scala | 13 +-
.../org/apache/spark/sql/CarbonSource.scala | 54 ++-
.../execution/CarbonLateDecodeStrategy.scala | 2 +-
.../execution/command/AlterTableCommands.scala | 16 +-
.../execution/command/CarbonHiveCommands.scala | 4 +-
.../sql/execution/command/DDLStrategy.scala | 3 +-
.../execution/command/carbonTableSchema.scala | 35 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 383 +++++++++----------
.../spark/sql/hive/CarbonHiveMetaStore.scala | 132 +------
.../apache/spark/sql/hive/CarbonMetaStore.scala | 54 ++-
.../spark/sql/hive/CarbonSessionState.scala | 7 +-
.../sql/test/Spark2TestQueryExecutor.scala | 2 +-
.../org/apache/spark/util/AlterTableUtil.scala | 12 +-
.../org/apache/spark/util/CleanFiles.scala | 3 +-
.../org/apache/spark/util/Compaction.scala | 3 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 3 +-
.../apache/spark/util/DeleteSegmentById.scala | 3 +-
.../org/apache/spark/util/ShowSegments.scala | 3 +-
.../org/apache/spark/util/TableLoader.scala | 3 +-
.../apache/spark/util/CarbonCommandSuite.scala | 27 +-
.../carbondata/processing/merger/TableMeta.java | 4 +-
.../util/CarbonDataProcessorUtil.java | 6 +-
39 files changed, 410 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 55a292e..f6e5c62 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -71,10 +71,6 @@ public final class CarbonCommonConstants {
@CarbonProperty
public static final String SORT_SIZE = "carbon.sort.size";
/**
- * default location of the carbon member, hierarchy and fact files
- */
- public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store";
- /**
* CARDINALITY_INCREMENT_DEFAULT_VALUE
*/
public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 3c39145..22faaf2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
/**
* identifier which will have store path and carbon table identifier
@@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable {
return carbonTableIdentifier;
}
- public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+ public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) {
CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
- return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+ return new AbsoluteTableIdentifier(storePath, identifier);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index d14b7ab..2f5874b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -81,10 +81,6 @@ public final class CarbonProperties {
} catch (IllegalAccessException e) {
LOGGER.error("Illelagal access to declared field" + e.getMessage());
}
- if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
- carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
- CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
- }
validateBlockletSize();
validateNumCores();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9c164a..59f8cd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -730,15 +730,6 @@ public final class CarbonUtil {
.startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX);
}
- public static String getCarbonStorePath() {
- CarbonProperties prop = CarbonProperties.getInstance();
- if (null == prop) {
- return null;
- }
- return prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
- CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
- }
-
/**
* This method will check the existence of a file at a given path
*/
@@ -1800,6 +1791,25 @@ public final class CarbonUtil {
}
/**
+ * Removes schema from properties
+ * @param properties
+ * @return
+ */
+ public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) {
+ Map<String, String> newMap = new HashMap<>();
+ newMap.putAll(properties);
+ String partsNo = newMap.get("carbonSchemaPartsNo");
+ if (partsNo == null) {
+ return newMap;
+ }
+ int no = Integer.parseInt(partsNo);
+ for (int i = 0; i < no; i++) {
+ newMap.remove("carbonSchema" + i);
+ }
+ return newMap;
+ }
+
+ /**
* This method will read the schema file from a given path
*
* @param schemaFilePath
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9912ec4..9926c57 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
class QueryTest extends PlanTest {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -85,7 +88,8 @@ class QueryTest extends PlanTest {
val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
- val storeLocation = TestQueryExecutor.storeLocation
+ lazy val storeLocation = CarbonProperties.getInstance().
+ getProperty(CarbonCommonConstants.STORE_LOCATION)
val resourcesPath = TestQueryExecutor.resourcesPath
val integrationPath = TestQueryExecutor.integrationPath
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 5b603aa..5cef14a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -213,8 +213,10 @@ public final class CarbonLoaderUtil {
String tempLocationKey = CarbonDataProcessorUtil
.getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow);
// form local store location
- final String localStoreLocation = CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+ final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey);
+ if (localStoreLocation == null) {
+ throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+ }
// submit local folder clean up in another thread so that main thread execution is not blocked
ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 45719fc..dc37360 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -157,8 +157,9 @@ object CarbonStore {
def isSegmentValid(
dbName: String,
tableName: String,
+ storePath: String,
segmentId: String): Boolean = {
- val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+ val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
SegmentStatusManager(
identifier).getValidAndInvalidSegments
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index b76bca3..149e3b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -53,7 +53,6 @@ object TestQueryExecutor {
val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
- .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
private def lookupQueryExecutor: Class[_] = {
ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index b8ba9f7..f8275d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -57,8 +57,8 @@ object CarbonThriftServer {
"Using default Value and proceeding")
Thread.sleep(30000)
}
-
- val cc = new CarbonContext(sc, args.head)
+ val storePath = if (args.length > 0) args.head else null
+ val cc = new CarbonContext(sc, storePath)
HiveThriftServer2.startWithContext(cc)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 1aeda95..da4b210 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,7 +44,7 @@ class CarbonContext(
def this(sc: SparkContext) = {
this(sc,
- new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+ null,
new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
}
@@ -66,8 +66,14 @@ class CarbonContext(
@transient
override lazy val catalog = {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+ val carbonProperties = CarbonProperties.getInstance()
+ if (storePath != null) {
+ carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+ // In case if it is in carbon.properties for backward compatible
+ } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+ carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+ conf.getConfString("spark.sql.warehouse.dir"))
+ }
new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2fc93e6..71ef6a6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation(
carbonTable.getDatabaseName,
carbonTable.getFactTableName,
CarbonSparkUtil.createSparkMeta(carbonTable),
- new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
+ new TableMeta(carbonTable.getCarbonTableIdentifier,
+ paths.head, absIdentifier.getTablePath, carbonTable),
None
)(sqlContext)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index e8d3907..7790f59 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
- carbonTable)
+ null, carbonTable)
}
}
})
@@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
tableInfo.setMetaDataFilepath(schemaMetadataPath)
tableInfo.setStorePath(storePath)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+ val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
val fileType = FileFactory.getFileType(schemaMetadataPath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index aba6891..34ac940 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -53,9 +53,9 @@ object CarbonThriftServer {
System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath"))
}
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head)
+ val storePath = if (args.length > 0) args.head else null
- val spark = builder.getOrCreateCarbonSession(args.head)
+ val spark = builder.getOrCreateCarbonSession(storePath)
val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
try {
Thread.sleep(Integer.parseInt(warmUpTime))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index d1d3015..de7f3fb 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -48,7 +48,8 @@ object CarbonSparkUtil {
def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val table = CarbonTable.buildFromTableInfo(tableInfo)
- val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+ val meta = new TableMeta(identifier.getCarbonTableIdentifier,
+ identifier.getStorePath, tablePath, table)
CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
CarbonSparkUtil.createSparkMeta(table), meta)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 1054c62..805a421 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
*/
private def loadTempCSV(options: CarbonOption): Unit = {
// temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
.append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 33091aa..43f6a21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder(
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
- val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sparkSession).storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder(
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sparkSession).storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d19eb39..9d10ea0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -40,6 +40,8 @@ class CarbonEnv {
var carbonSessionInfo: CarbonSessionInfo = _
+ var storePath: String = _
+
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
// set readsupport class global so that the executor can get it.
@@ -61,10 +63,14 @@ class CarbonEnv {
// add session params after adding DefaultCarbonParams
config.addDefaultCarbonSessionParams()
carbonMetastore = {
- val storePath =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+ val properties = CarbonProperties.getInstance()
+ storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+ if (storePath == null) {
+ storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+ properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+ }
LOGGER.info(s"carbon env initial: $storePath")
- CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
+ CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
initialized = true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index b436891..7390cf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -65,7 +65,7 @@ object CarbonSession {
def getOrCreateCarbonSession(): SparkSession = {
getOrCreateCarbonSession(
- new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+ null,
new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
}
@@ -145,9 +145,14 @@ object CarbonSession {
}
sc
}
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+ val carbonProperties = CarbonProperties.getInstance()
+ if (storePath != null) {
+ carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+ // In case if it is in carbon.properties for backward compatible
+ } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+ carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+ sparkContext.conf.get("spark.sql.warehouse.dir"))
+ }
session = new CarbonSession(sparkContext)
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
SparkSession.setDefaultSession(session)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 498ea03..bec163b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DecimalType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
- dataSchema: StructType): String = {
+ dataSchema: StructType) = {
val dbName: String = parameters.getOrElse("dbName",
CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
@@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
} else {
CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName"
}
} catch {
case ex: NoSuchTableException =>
@@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
dbName,
tableName)
CreateTable(cm, false).run(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ getPathForTable(sparkSession, dbName, tableName, parameters)
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
}
@@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
if (parameters.contains("tablePath")) {
parameters.get("tablePath").get
} else {
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ relation.tableMeta.tablePath
}
} catch {
case ex: Exception =>
@@ -234,22 +238,46 @@ object CarbonSource {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val storageFormat = tableDesc.storage
val properties = storageFormat.properties
- if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+ if (!properties.contains("carbonSchemaPartsNo")) {
val dbName: String = properties.getOrElse("dbName",
CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
val tableName: String = properties.getOrElse("tableName", "").toLowerCase
val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
val tableInfo: TableInfo = TableNewProcessor(model)
- val (tablePath, carbonSchemaString) =
- metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
- val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+ val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+ val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvalution.
+ getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ val map = if (metaStore.isReadFromHiveMetaStore) {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+ val schemaMetadataPath =
+ CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(tableIdentifier.getStorePath)
+ CarbonUtil.convertToMultiStringMap(tableInfo)
+ } else {
+ metaStore.saveToDisk(tableInfo, tablePath)
+ new java.util.HashMap[String, String]()
+ }
properties.foreach(e => map.put(e._1, e._2))
map.put("tablePath", tablePath)
// updating params
val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
tableDesc.copy(storage = updatedFormat)
} else {
- tableDesc
+ val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
+ if (!metaStore.isReadFromHiveMetaStore) {
+ // save to disk
+ metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+ // remove schema string from map as we don't store carbon schema to hive metastore
+ val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
+ val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+ tableDesc.copy(storage = updatedFormat)
+ } else {
+ tableDesc
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 1cc6668..33bba8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CarbonAliasDecoderRelation(),
rdd,
output,
- CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+ CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
table.carbonTable.getTableInfo.serialize())
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 0d5a821..17e456d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
import org.apache.spark.util.AlterTableUtil
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
locks = AlterTableUtil
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
sparkSession)
- carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+ val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].tableMeta
+ carbonTable = tableMeta.carbonTable
// get the latest carbon table and check for column existence
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
val tableMetadataFile = carbonTablePath.getPath
val tableInfo: org.apache.carbondata.format.TableInfo =
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
+ tableMeta.tablePath)(sparkSession)
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
.runSqlHive(
@@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
s"('tableName'='$newTableName', " +
s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+ Some(oldDatabaseName)).quotedString)
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 609f39b..2731104 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
}
}
- CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
- .carbonMetastore.storePath)
+ CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
+ CarbonEnv.getInstance(sparkSession).storePath)
rows
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 760cb06..18d2dc7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
_, child: LogicalPlan, _, _) =>
ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
- CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
- carbonMetastore.storePath)
+ CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cc18fa3..eeb2022 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+ carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+ val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(storePath)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
val tbName = cm.tableName
@@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
sys.error(s"Table [$tbName] already exists under database [$dbName]")
}
} else {
+ val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val (tablePath, carbonSchemaString) =
- catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+ val tablePath = tableIdentifier.getTablePath
+ val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
if (createDSTable) {
try {
val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(catalog.storePath, identifier)(sparkSession)
+ .dropTable(tablePath, identifier)(sparkSession)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
s"and Table name [$tbName] failed")
@@ -332,9 +335,7 @@ object LoadTable {
tableInfo, entry, carbonTablePath.getPath)(sparkSession)
// update the schema modified time
- metastore.updateAndTouchSchemasUpdatedTime(
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName)
+ metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
// update CarbonDataLoadSchema
val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
@@ -526,7 +527,7 @@ case class LoadTable(
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+ carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
val table = relation.tableMeta.carbonTable
carbonLoadModel.setTableName(table.getFactTableName)
@@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- catalog.checkSchemasModifiedTimeAndReloadTables()
+ val tableIdentifier =
+ AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+ dbName.toLowerCase, tableName.toLowerCase)
+ catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
@@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(catalog.storePath, identifier)(sparkSession)
+ .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} catch {
case ex: Exception =>
@@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val metadataFilePath = CarbonStorePath
- .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath
+ val tableIdentifier =
+ AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+ val metadataFilePath =
+ CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
val fileType = FileFactory.getFileType(metadataFilePath)
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..2407054 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
System.nanoTime() + ""
}
- lazy val metadata = loadMetadata(storePath, nextQueryId)
+ val metadata = MetaData(new ArrayBuffer[TableMeta]())
/**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
override def createCarbonRelation(parameters: Map[String, String],
absIdentifier: AbsoluteTableIdentifier,
sparkSession: SparkSession): CarbonRelation = {
- lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
- Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
- .asInstanceOf[CarbonRelation]
+ val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+ val tables = getTableFromMetadataCache(database, tableName)
+ tables match {
+ case Some(t) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+ case None =>
+ readCarbonSchema(absIdentifier) match {
+ case Some(meta) =>
+ CarbonRelation(database, tableName,
+ CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+ case None =>
+ throw new NoSuchTableException(database, tableName)
+ }
+ }
}
def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
}
- def lookupRelation(tableIdentifier: TableIdentifier)
+ override def lookupRelation(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): LogicalPlan = {
- checkSchemasModifiedTimeAndReloadTables()
val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase
- )
- val tables = getTableFromMetadata(database, tableIdentifier.table, true)
- tables match {
- case Some(t) =>
- CarbonRelation(database, tableIdentifier.table,
- CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
- case None =>
- throw new NoSuchTableException(database, tableIdentifier.table)
+ sparkSession.catalog.currentDatabase)
+ val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+ _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case LogicalRelation(
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ carbonDatasourceHadoopRelation.carbonRelation
+ case _ => throw new NoSuchTableException(database, tableIdentifier.table)
}
+ relation
}
/**
@@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
* @return
*/
- def getTableFromMetadata(database: String,
- tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+ def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
metadata.tablesMeta
.find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableExists(TableIdentifier(table, databaseOp))(sparkSession)
}
- def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tables = metadata.tablesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
- tables.nonEmpty
- }
-
- def loadMetadata(metadataPath: String, queryId: String): MetaData = {
- val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val statistic = new QueryStatistic()
- // creating zookeeper instance once.
- // if zookeeper is configured as carbon lock type.
- val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
- if (null != zookeeperurl) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
- }
- if (metadataPath == null) {
- return null
- }
- // if no locktype is configured and store type is HDFS set HDFS lock as default
- if (null == CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
- FileType.HDFS == FileFactory.getFileType(metadataPath)) {
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
- )
- LOGGER.info("Default lock type HDFSLOCK is configured")
+ override def tableExists(tableIdentifier: TableIdentifier)
+ (sparkSession: SparkSession): Boolean = {
+ try {
+ lookupRelation(tableIdentifier)(sparkSession)
+ } catch {
+ case e: Exception =>
+ return false
}
- val fileType = FileFactory.getFileType(metadataPath)
- val metaDataBuffer = new ArrayBuffer[TableMeta]
- fillMetaData(metadataPath, fileType, metaDataBuffer)
- updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
- statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
- System.currentTimeMillis())
- recorder.recordStatisticsForDriver(statistic, queryId)
- MetaData(metaDataBuffer)
+ true
}
- private def fillMetaData(basePath: String, fileType: FileType,
- metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
- val databasePath = basePath // + "/schemas"
- try {
- if (FileFactory.isFileExist(databasePath, fileType)) {
- val file = FileFactory.getCarbonFile(databasePath, fileType)
- val databaseFolders = file.listFiles()
-
- databaseFolders.foreach(databaseFolder => {
- if (databaseFolder.isDirectory) {
- val dbName = databaseFolder.getName
- val tableFolders = databaseFolder.listFiles()
-
- tableFolders.foreach(tableFolder => {
- if (tableFolder.isDirectory) {
- val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
- tableFolder.getName, UUID.randomUUID().toString)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
- carbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val tableName = tableFolder.getName
- val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
- val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
- val schemaFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
- wrapperTableInfo.setStorePath(storePath)
- wrapperTableInfo
- .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
- carbonTable)
- }
- }
- })
- }
- })
- } else {
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
- }
- } catch {
- case s: java.io.FileNotFoundException =>
- s.printStackTrace()
- // Create folders and files.
- FileFactory.mkdirs(databasePath, fileType)
+ private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+ val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = identifier.getCarbonTableIdentifier.getTableName
+ val storePath = identifier.getStorePath
+ val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+ tableName.toLowerCase(), UUID.randomUUID().toString)
+ val carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableUniqueName = dbName + "_" + tableName
+ val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+ val schemaFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+ wrapperTableInfo.setStorePath(storePath)
+ wrapperTableInfo
+ .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+ identifier.getStorePath,
+ identifier.getTablePath,
+ carbonTable)
+ metadata.tablesMeta += tableMeta
+ Some(tableMeta)
+ } else {
+ None
}
}
@@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String) (sparkSession: SparkSession): String = {
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
.fromExternalToWrapperTableInfo(thriftTableInfo,
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
- carbonStorePath)
- createSchemaThriftFile(wrapperTableInfo,
+ absoluteTableIdentifier.getStorePath)
+ val identifier =
+ new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ wrapperTableInfo.getFactTable.getTableId)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
+ identifier)
+ addTableCache(wrapperTableInfo,
+ AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
newTableIdentifier.getDatabaseName,
- newTableIdentifier.getTableName)(sparkSession)
+ newTableIdentifier.getTableName))
+ path
}
/**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
+ * @param tablePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
- carbonStorePath: String)
- (sparkSession: SparkSession): String = {
+ tablePath: String)(sparkSession: SparkSession): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
- carbonStorePath)
+ tableIdentifier.getStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- createSchemaThriftFile(wrapperTableInfo,
+ wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ val path = createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
- carbonTableIdentifier.getDatabaseName,
- carbonTableIdentifier.getTableName)(sparkSession)
+ tableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, tableIdentifier)
+ path
}
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* Load CarbonTable from wrapper tableInfo
*
*/
- def createTableFromThrift(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
- if (tableExists(tableName, Some(dbName))(sparkSession)) {
- sys.error(s"Table [$tableName] already exists under Database [$dbName]")
- }
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+ def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val dbName = tableInfo.getDatabaseName
+ val tableName = tableInfo.getFactTable.getTableName
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
- .add(schemaEvolutionEntry)
- val carbonTablePath = createSchemaThriftFile(tableInfo,
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ tableInfo.setStorePath(identifier.getStorePath)
+ createSchemaThriftFile(tableInfo,
thriftTableInfo,
- dbName,
- tableName)(sparkSession)
+ identifier.getCarbonTableIdentifier)
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- (carbonTablePath, "")
+ }
+
+ /**
+ * Generates schema string from TableInfo
+ */
+ override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+ tablePath: String): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+ val schemaMetadataPath =
+ CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ tableInfo.setStorePath(tableIdentifier.getStorePath)
+ val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+ tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+ removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ addTableCache(tableInfo, tableIdentifier)
+ CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
}
/**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
*
* @param tableInfo
* @param thriftTableInfo
- * @param dbName
- * @param tableName
- * @param sparkSession
* @return
*/
- private def createSchemaThriftFile(
- tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
- thriftTableInfo: org.apache.carbondata.format.TableInfo,
- dbName: String, tableName: String)
- (sparkSession: SparkSession): String = {
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+ thriftTableInfo: TableInfo,
+ carbonTableIdentifier: CarbonTableIdentifier): String = {
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
val schemaFilePath = carbonTablePath.getSchemaFilePath
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
- removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+ carbonTablePath.getPath
+ }
+
+ protected def addTableCache(tableInfo: table.TableInfo,
+ absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+ val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+ CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+ removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
- CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+ val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getTablePath,
+ CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
metadata.tablesMeta += tableMeta
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- carbonTablePath.getPath
}
/**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
* @param tableName
*/
def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
metadataToBeRemoved match {
case Some(tableMeta) =>
metadata.tablesMeta -= tableMeta
CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
case None =>
- LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ if (LOGGER.isDebugEnabled) {
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ }
}
}
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
- val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = tableIdentifier.table.toLowerCase
-
- val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
- val fileType = FileFactory.getFileType(tablePath)
- FileFactory.isFileExist(tablePath, fileType)
+ try {
+ val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+ asInstanceOf[CarbonRelation].tableMeta.tablePath
+ val fileType = FileFactory.getFileType(tablePath)
+ FileFactory.isFileExist(tablePath, fileType)
+ } catch {
+ case e: Exception =>
+ false
+ }
}
- def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
-
- val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
- new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables
-
- val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
- tableIdentifier.table)
- metadataToBeRemoved match {
- case Some(tableMeta) =>
- metadata.tablesMeta -= tableMeta
- CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
- case None =>
- LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
- }
+ checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+ removeTableFromMetadata(dbName, tableName)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
}
- private def getTimestampFileAndType(databaseName: String, tableName: String) = {
- val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+ private def getTimestampFileAndType(basePath: String) = {
+ val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
val timestampFileType = FileFactory.getFileType(timestampFile)
(timestampFile, timestampFileType)
}
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
tableModifiedTimeStore.put("default", timeStamp)
}
- def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+ def updateAndTouchSchemasUpdatedTime(basePath: String) {
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
}
- /**
- * This method will read the timestamp of empty schema file
- *
- * @param databaseName
- * @param tableName
- * @return
- */
- private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
- if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
- FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
- } else {
- System.currentTimeMillis()
- }
- }
/**
* This method will check and create an empty schema timestamp file
*
- * @param databaseName
- * @param tableName
* @return
*/
- private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
- val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+ private def touchSchemaFileSystemTime(basePath: String): Long = {
+ val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
- LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+ LOGGER.audit(s"Creating timestamp file for $basePath")
FileFactory.createNewFile(timestampFile, timestampFileType)
}
val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
systemTime
}
- def checkSchemasModifiedTimeAndReloadTables() {
- val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+ def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+ val (timestampFile, timestampFileType) =
+ getTimestampFileAndType(storePath)
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
}
private def refreshCache() {
- metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+ metadata.tablesMeta.clear()
}
override def isReadFromHiveMetaStore: Boolean = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a8f92ce..c328130 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -16,21 +16,15 @@
*/
package org.apache.spark.sql.hive
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
@@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Metastore to store carbonschema in hive
*/
-class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
- extends CarbonFileMetastore(conf, storePath) {
+class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) {
override def isReadFromHiveMetaStore: Boolean = true
-
/**
* Create spark session from paramters.
*
@@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
if (info != null) {
val table = CarbonTable.buildFromTableInfo(info)
val meta = new TableMeta(table.getCarbonTableIdentifier,
- table.getStorePath, table)
+ absIdentifier.getStorePath, absIdentifier.getTablePath, table)
CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
CarbonSparkUtil.createSparkMeta(table), meta)
} else {
@@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
}
}
- override def lookupRelation(tableIdentifier: TableIdentifier)
- (sparkSession: SparkSession): LogicalPlan = {
- val database = tableIdentifier.database.getOrElse(
- sparkSession.catalog.currentDatabase)
- val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
- _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case LogicalRelation(
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- carbonDatasourceHadoopRelation.carbonRelation
- case _ => throw new NoSuchTableException(database, tableIdentifier.table)
- }
- relation
- }
-
- /**
- * This method will search for a table in the catalog metadata
- *
- * @param database
- * @param tableName
- * @return
- */
- override def getTableFromMetadata(database: String,
- tableName: String,
- readStore: Boolean): Option[TableMeta] = {
- if (!readStore) {
- None
- } else {
- super.getTableFromMetadata(database, tableName, readStore)
- }
- }
-
- override def tableExists(tableIdentifier: TableIdentifier)
- (sparkSession: SparkSession): Boolean = {
- try {
- lookupRelation(tableIdentifier)(sparkSession)
- } catch {
- case e: Exception =>
- return false
- }
- true
- }
-
- override def loadMetadata(metadataPath: String,
- queryId: String): MetaData = {
- MetaData(new ArrayBuffer[TableMeta])
- }
-
-
- /**
- *
- * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
- * Load CarbonTable from wrapper tableInfo
- *
- */
- override def createTableFromThrift(tableInfo: TableInfo, dbName: String,
- tableName: String)(sparkSession: SparkSession): (String, String) = {
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
- tableInfo.getFactTable.getTableId)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val schemaMetadataPath =
- CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
- tableInfo.setMetaDataFilepath(schemaMetadataPath)
- tableInfo.setStorePath(storePath)
- val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
- tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
- removeTableFromMetadata(dbName, tableName)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ","))
- }
-
- /**
- * This method will remove the table meta from catalog metadata array
- *
- * @param dbName
- * @param tableName
- */
- override def removeTableFromMetadata(dbName: String,
- tableName: String): Unit = {
- // do nothing
- }
override def isTablePathExists(tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): Boolean = {
tableExists(tableIdentifier)(sparkSession)
}
- override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+ override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession): Unit = {
val dbName = tableIdentifier.database.get
val tableName = tableIdentifier.table
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
+ checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+ removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
}
- override def checkSchemasModifiedTimeAndReloadTables(): Unit = {
+ override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
// do nothing now
}
@@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
* @param newTableIdentifier
* @param thriftTableInfo
* @param schemaEvolutionEntry
- * @param carbonStorePath
* @param sparkSession
*/
override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
- carbonStorePath: String)
+ tablePath: String)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
updateHiveMetaStore(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
- carbonStorePath,
+ identifier.getStorePath,
sparkSession,
schemaConverter)
}
@@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
newTableIdentifier.getDatabaseName,
newTableIdentifier.getTableName,
carbonStorePath)
- wrapperTableInfo.setStorePath(storePath)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier)
+ wrapperTableInfo.setStorePath(carbonStorePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
val schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
val dbName = oldTableIdentifier.getDatabaseName
val tableName = oldTableIdentifier.getTableName
- val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName,
- wrapperTableInfo.getFactTable.getTableId)
val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
- removeTableFromMetadata(wrapperTableInfo.getDatabaseName,
- wrapperTableInfo.getFactTable.getTableName)
+ removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
- CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
}
/**
@@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
*
* @param carbonTableIdentifier
* @param thriftTableInfo
- * @param carbonStorePath
* @param sparkSession
*/
override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
- carbonStorePath: String)
+ tablePath: String)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
updateHiveMetaStore(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
- carbonStorePath,
+ identifier.getStorePath,
sparkSession,
schemaConverter)
}
+
+
}