You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/04 12:35:25 UTC
carbondata git commit: [CARBONDATA-2986] Table Properties are lost
when multiple driver concurrently
Repository: carbondata
Updated Branches:
refs/heads/master 11bd0ade9 -> 3edea12a8
[CARBONDATA-2986] Table Properties are lost when multiple driver concurrently
Issue :- When concurrently multiple driver is creating table , for same table table properties are lost .
Root Cause :-Schema file is getting overwritten from CarbonRelation#createTableIfNotExists,because lookup of table is failed . this is happpened because concurrenly .mdt file is updated and current table is removed from cache org.apache.spark.sql.hive.CarbonFileMetastore#checkSchemasModifiedTimeAndReloadTable
Solution :- Since carbon table is already created and Schema file is already written so no need to do lookup again .
This closes #2785
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3edea12a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3edea12a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3edea12a
Branch: refs/heads/master
Commit: 3edea12a83e70dddb1eca271bf5660f73de272f5
Parents: 11bd0ad
Author: BJangir <ba...@gmail.com>
Authored: Fri Sep 28 17:17:30 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Oct 4 18:05:06 2018 +0530
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/CarbonSource.scala | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3edea12a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 16cee96..cd1087d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -57,6 +57,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
with SchemaRelationProvider with StreamSinkProvider with DataSourceRegister {
override def shortName(): String = "carbondata"
+ private val LOGGER = LogServiceFactory.getLogService(CarbonSource.getClass.getName)
// will be called if hive supported create table command is provided
override def createRelation(sqlContext: SQLContext,
@@ -143,7 +144,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
.exists(_.table.equalsIgnoreCase(tableName))) {
getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters)
} else {
- createTableIfNotExists(sqlContext.sparkSession, newParameters, dataSchema)
+ createTableIfNotExists(sqlContext.sparkSession, dbName, tableName, newParameters, dataSchema)
}
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams,
@@ -160,6 +161,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
private def createTableIfNotExists(
sparkSession: SparkSession,
+ dbName: String,
+ tableName: String,
parameters: Map[String, String],
dataSchema: StructType): (String, Map[String, String]) = {
@@ -167,10 +170,18 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
try {
- val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- (carbonTable.getTablePath, parameters)
+ if (!(parameters.contains("carbonSchemaPartsNo")
+ || parameters.contains("carbonschemapartsno"))) {
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ (carbonTable.getTablePath, parameters)
+ } else {
+ (getPathForTable(sparkSession, dbName, tableName, parameters))
+ }
+
} catch {
case _: NoSuchTableException =>
+ LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
+ "Now existing Schema will be overwritten with default properties")
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val identifier = AbsoluteTableIdentifier.from(
CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),