You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/04/29 11:24:44 UTC
[carbondata] branch master updated: [CARBONDATA-3778] Change
conf.getProperty("carbon.storelocation") to CarbonEvn.getDatabaseLocation
in mv module
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b3b8ca5 [CARBONDATA-3778] Change conf.getProperty("carbon.storelocation") to CarbonEvn.getDatabaseLocation in mv module
b3b8ca5 is described below
commit b3b8ca5c64faa9832ae25146cea57529c371c273
Author: liuzhi <37...@qq.com>
AuthorDate: Mon Apr 27 10:11:48 2020 +0800
[CARBONDATA-3778] Change conf.getProperty("carbon.storelocation") to CarbonEvn.getDatabaseLocation in mv module
Why is this PR needed?
Change the way of mv module get database location to recommend way.
Currently , the recommend way is using CarbonEvn.getDatabaseLocation,not using carbon.storelocation variable.
What changes were proposed in this PR?
Change all code which use carbon.storelocation variable to use CarbonEvn.getDatabaseLocation.
This closes #3727
---
.../org/apache/carbondata/core/view/MVManager.java | 16 ++--
.../apache/carbondata/core/view/MVProvider.java | 88 ++++++++--------------
.../apache/carbondata/view/MVManagerInSpark.scala | 4 +
.../command/view/CarbonCreateMVCommand.scala | 3 +-
4 files changed, 44 insertions(+), 67 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index 5c68a3b..b2adc5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -48,8 +48,7 @@ public abstract class MVManager {
private static final Logger LOGGER =
LogServiceFactory.getLogService(MVManager.class.getName());
- private final MVProvider schemaProvider =
- MVProvider.get();
+ private final MVProvider schemaProvider = new MVProvider();
private volatile MVCatalog<?> catalog;
@@ -61,6 +60,8 @@ public abstract class MVManager {
public abstract List<String> getDatabases();
+ public abstract String getDatabaseLocation(String databaseName);
+
public boolean hasSchemaOnTable(CarbonTable table) throws IOException {
List<MVSchema> schemas = getSchemas();
for (MVSchema schema : schemas) {
@@ -145,7 +146,7 @@ public abstract class MVManager {
* @param viewName data map name
*/
public void deleteSchema(String databaseName, String viewName) throws IOException {
- schemaProvider.dropSchema(databaseName, viewName);
+ schemaProvider.dropSchema(this, databaseName, viewName);
}
/**
@@ -239,8 +240,7 @@ public abstract class MVManager {
*/
List<MVStatusDetail> getEnabledStatusDetails(String databaseName)
throws IOException {
- List<MVStatusDetail> statusDetails =
- schemaProvider.getStatusDetails(databaseName);
+ List<MVStatusDetail> statusDetails = schemaProvider.getStatusDetails(this, databaseName);
List<MVStatusDetail> enabledStatusDetails = new ArrayList<>(statusDetails.size());
for (MVStatusDetail statusDetail : statusDetails) {
if (statusDetail.getStatus() == MVStatus.ENABLED) {
@@ -255,14 +255,14 @@ public abstract class MVManager {
MVSchema schema = getSchema(
viewIdentifier.getDatabaseName(), viewIdentifier.getTableName());
if (schema != null) {
- schemaProvider.updateStatus(Collections.singletonList(schema), viewStatus);
+ schemaProvider.updateStatus(this, Collections.singletonList(schema), viewStatus);
}
}
public void setStatus(List<MVSchema> viewSchemas, MVStatus viewStatus)
throws IOException {
if (viewSchemas != null && !viewSchemas.isEmpty()) {
- schemaProvider.updateStatus(viewSchemas, viewStatus);
+ schemaProvider.updateStatus(this, viewSchemas, viewStatus);
}
}
@@ -271,7 +271,7 @@ public abstract class MVManager {
MVSchema viewSchema = getSchema(databaseName, viewName);
if (viewSchema != null) {
schemaProvider.updateStatus(
- Collections.singletonList(viewSchema), MVStatus.DROPPED);
+ this, Collections.singletonList(viewSchema), MVStatus.DROPPED);
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 44e8616..524c31f 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -74,46 +74,25 @@ public class MVProvider {
private static final String STATUS_FILE_NAME = "mv_status";
- private final String storeLocation;
-
private final Map<String, SchemaProvider> schemaProviders = new ConcurrentHashMap<>();
- private MVProvider(String storeLocation) {
- this.storeLocation = storeLocation;
- }
-
- public static MVProvider get() {
- String storeLocation =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
- if (storeLocation == null) {
- throw new RuntimeException(
- "Property [" + CarbonCommonConstants.STORE_LOCATION + "] is not set.");
- }
- return new MVProvider(storeLocation);
- }
-
private static String getSchemaPath(String schemaRoot, String viewName) {
return schemaRoot + CarbonCommonConstants.FILE_SEPARATOR + "mv_schema." + viewName;
}
- private SchemaProvider getSchemaProvider(String databaseName) {
+ private SchemaProvider getSchemaProvider(MVManager viewManager, String databaseName) {
String databaseNameUpper = databaseName.toUpperCase();
SchemaProvider schemaProvider = this.schemaProviders.get(databaseNameUpper);
if (schemaProvider == null) {
synchronized (this.schemaProviders) {
schemaProvider = this.schemaProviders.get(databaseNameUpper);
if (schemaProvider == null) {
- String databaseLocation;
- if (databaseNameUpper.equalsIgnoreCase(CarbonCommonConstants.DATABASE_DEFAULT_NAME)) {
- databaseLocation = CarbonUtil.checkAndAppendHDFSUrl(this.storeLocation);
- } else {
- databaseLocation = CarbonUtil.checkAndAppendHDFSUrl(this.storeLocation +
- CarbonCommonConstants.FILE_SEPARATOR + databaseName + ".db");
- }
- if (!FileFactory.getCarbonFile(databaseLocation).exists()) {
+ String databaseLocation = viewManager.getDatabaseLocation(databaseName);
+ CarbonFile databasePath = FileFactory.getCarbonFile(databaseLocation);
+ if (!databasePath.exists()) {
return null;
}
- schemaProvider = new SchemaProvider(databaseLocation);
+ schemaProvider = new SchemaProvider(databasePath.getCanonicalPath());
this.schemaProviders.put(databaseNameUpper, schemaProvider);
}
}
@@ -121,18 +100,18 @@ public class MVProvider {
return schemaProvider;
}
- public MVSchema getSchema(MVManager viewManager,
- String databaseName, String viewName) throws IOException {
- SchemaProvider schemaProvider = this.getSchemaProvider(databaseName);
+ public MVSchema getSchema(MVManager viewManager, String databaseName,
+ String viewName) throws IOException {
+ SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, databaseName);
if (schemaProvider == null) {
return null;
}
return schemaProvider.retrieveSchema(viewManager, viewName);
}
- List<MVSchema> getSchemas(MVManager viewManager,
- String databaseName, CarbonTable carbonTable) throws IOException {
- SchemaProvider schemaProvider = this.getSchemaProvider(databaseName);
+ List<MVSchema> getSchemas(MVManager viewManager, String databaseName,
+ CarbonTable carbonTable) throws IOException {
+ SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, databaseName);
if (schemaProvider == null) {
return Collections.emptyList();
} else {
@@ -140,9 +119,8 @@ public class MVProvider {
}
}
- List<MVSchema> getSchemas(MVManager viewManager,
- String databaseName) throws IOException {
- SchemaProvider schemaProvider = this.getSchemaProvider(databaseName);
+ List<MVSchema> getSchemas(MVManager viewManager, String databaseName) throws IOException {
+ SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, databaseName);
if (schemaProvider == null) {
return Collections.emptyList();
} else {
@@ -152,16 +130,16 @@ public class MVProvider {
void saveSchema(MVManager viewManager, String databaseName,
MVSchema viewSchema) throws IOException {
- SchemaProvider schemaProvider = this.getSchemaProvider(databaseName);
+ SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, databaseName);
if (schemaProvider == null) {
throw new IOException("Database [" + databaseName + "] is not found.");
}
schemaProvider.saveSchema(viewManager, viewSchema);
}
- public void dropSchema(String databaseName, String viewName)
- throws IOException {
- SchemaProvider schemaProvider = this.getSchemaProvider(databaseName);
+ public void dropSchema(MVManager viewManager, String databaseName,
+ String viewName)throws IOException {
+ SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, databaseName);
if (schemaProvider == null) {
throw new IOException("Materialized view with name " + databaseName + "." + viewName +
" does not exists in storage");
@@ -169,22 +147,16 @@ public class MVProvider {
schemaProvider.dropSchema(viewName);
}
- private String getStatusFileName(String databaseName) {
- if (databaseName.equalsIgnoreCase("default")) {
- return this.storeLocation +
- CarbonCommonConstants.FILE_SEPARATOR + "_system" +
- CarbonCommonConstants.FILE_SEPARATOR + STATUS_FILE_NAME;
- } else {
- return this.storeLocation +
- CarbonCommonConstants.FILE_SEPARATOR + databaseName + ".db" +
- CarbonCommonConstants.FILE_SEPARATOR + "_system" +
- CarbonCommonConstants.FILE_SEPARATOR + STATUS_FILE_NAME;
- }
+ private String getStatusFileName(MVManager viewManager, String databaseName) {
+ String databaseLocation = viewManager.getDatabaseLocation(databaseName);
+ return FileFactory.getCarbonFile(databaseLocation).getCanonicalPath() +
+ CarbonCommonConstants.FILE_SEPARATOR + "_system" +
+ CarbonCommonConstants.FILE_SEPARATOR + STATUS_FILE_NAME;
}
- public List<MVStatusDetail> getStatusDetails(String databaseName)
+ public List<MVStatusDetail> getStatusDetails(MVManager viewManager, String databaseName)
throws IOException {
- String statusPath = this.getStatusFileName(databaseName);
+ String statusPath = this.getStatusFileName(viewManager, databaseName);
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
@@ -228,7 +200,7 @@ public class MVProvider {
* @param schemaList schemas of which are need to be updated in mv status
* @param status status to be updated for the mv schemas
*/
- public void updateStatus(List<MVSchema> schemaList,
+ public void updateStatus(MVManager viewManager, List<MVSchema> schemaList,
MVStatus status) throws IOException {
if (schemaList == null || schemaList.size() == 0) {
// There is nothing to update
@@ -245,11 +217,11 @@ public class MVProvider {
schemas.add(schema);
}
for (Map.Entry<String, List<MVSchema>> entry : schemasMapByDatabase.entrySet()) {
- this.updateStatus(entry.getKey(), entry.getValue(), status);
+ this.updateStatus(viewManager, entry.getKey(), entry.getValue(), status);
}
}
- private void updateStatus(String databaseName, List<MVSchema> schemaList,
+ private void updateStatus(MVManager viewManager, String databaseName, List<MVSchema> schemaList,
MVStatus status) throws IOException {
ICarbonLock carbonTableStatusLock = getStatusLock(databaseName);
boolean locked = false;
@@ -264,7 +236,7 @@ public class MVProvider {
}
}
List<MVStatusDetail> statusDetailList =
- new ArrayList<>(getStatusDetails(databaseName));
+ new ArrayList<>(getStatusDetails(viewManager, databaseName));
List<MVStatusDetail> changedStatusDetails = new ArrayList<>();
List<MVStatusDetail> newStatusDetails = new ArrayList<>();
for (MVSchema schema : schemaList) {
@@ -292,9 +264,9 @@ public class MVProvider {
statusDetailList.removeAll(changedStatusDetails);
}
writeLoadDetailsIntoFile(
- this.getStatusFileName(databaseName),
+ this.getStatusFileName(viewManager, databaseName),
statusDetailList.toArray(
- new MVStatusDetail[statusDetailList.size()]));
+ new MVStatusDetail[statusDetailList.size()]));
} else {
String errorMsg = "Updating MV status is failed due to another process taken the lock"
+ " for updating it";
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index 04102c3..430d2fd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -40,6 +40,10 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
CarbonUtils.threadUnset(CarbonCommonConstants.DISABLE_SQL_REWRITE)
}
}
+
+ override def getDatabaseLocation(databaseName: String): String = {
+ CarbonEnv.getDatabaseLocation(databaseName, session)
+ }
}
object MVManagerInSpark {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index eb9cc2f..96ac24f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -321,7 +321,8 @@ case class CarbonCreateMVCommand(
ifExistsSet = true,
Option(schema.getIdentifier.getDatabaseName),
schema.getIdentifier.getTableName,
- dropChildTable = true)
+ dropChildTable = true,
+ isInternalCall = true)
dropTableCommand.run(session)
throw exception
}