You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/07/14 13:35:37 UTC

[carbondata] branch master updated: [CARBONDATA-3898] Support Option 'carbon.enable.mv'

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7a213  [CARBONDATA-3898] Support Option 'carbon.enable.mv'
4a7a213 is described below

commit 4a7a2130bbc9bb3f9b49cd860c787fd2b9f65baa
Author: haomarch <ma...@126.com>
AuthorDate: Mon Jul 13 02:08:21 2020 +0800

    [CARBONDATA-3898] Support Option 'carbon.enable.mv'
    
    Why is this PR needed?
    When MV enabled, SQL rewrite takes a lot of time, a new option
    'carbon.enable.querywithmv' shall be supported, which can turn off
    SQL Rewrite when the configured value is false
    
    What changes were proposed in this PR?
    Add option 'carbon.enable.querywithmv', then logicplan won't be
    changed if configured value is false
    
    This closes #3839
---
 .../core/constants/CarbonCommonConstants.java      | 14 ++++---
 .../carbondata/core/util/CarbonProperties.java     | 27 ++++++++++++
 .../apache/carbondata/core/util/SessionParams.java |  4 +-
 .../core/CarbonPropertiesValidationTest.java       | 16 +++++++
 docs/configuration-parameters.md                   |  1 +
 .../apache/carbondata/view/MVManagerInSpark.scala  |  4 +-
 .../command/management/CarbonLoadDataCommand.scala |  2 +-
 .../apache/spark/sql/optimizer/MVRewriteRule.scala | 12 ++----
 .../scala/org/apache/carbondata/view/MVTest.scala  | 49 +++++++++++++++++++++-
 9 files changed, 107 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4a0605e..d5808ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1220,6 +1220,14 @@ public final class CarbonCommonConstants {
   public static final String CARBON_INPUT_SEGMENTS = "carbon.input.segments.";
 
   /**
+   * Materialized view thread context properties
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_ENABLE_MV = "carbon.enable.mv";
+
+  public static final String CARBON_ENABLE_MV_DEFAULT = "true";
+
+  /**
    * ENABLE_QUERY_STATISTICS
    */
   @CarbonProperty
@@ -2434,12 +2442,6 @@ public final class CarbonCommonConstants {
   public static final String INDEX_STATUS = "index_status";
 
   /**
-   * Materialized view thread context properties
-   */
-  @CarbonProperty
-  public static final String DISABLE_SQL_REWRITE = "disable_sql_rewrite";
-
-  /**
    * property which defines the insert stage flow
    */
   public static final String IS_INSERT_STAGE = "is_insert_stage";
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 90409e8..cc1af6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -250,6 +250,7 @@ public final class CarbonProperties {
     validateNumberOfColumnPerIORead();
     validateEnableUnsafeSort();
     validateEnableOffHeapSort();
+    validateEnableMV();
     validateCustomBlockDistribution();
     validateEnableVectorReader();
     validateLockType();
@@ -501,6 +502,19 @@ public final class CarbonProperties {
     }
   }
 
+  private void validateEnableMV() {
+    String isMVEnabled = carbonProperties.getProperty(CarbonCommonConstants.CARBON_ENABLE_MV);
+    if (!CarbonUtil.validateBoolean(isMVEnabled)) {
+      LOGGER.warn(String.format("The enable mv value \"%s\" is invalid. " +
+              "Using the default value \"%s\"",
+              isMVEnabled,
+          CarbonCommonConstants.CARBON_ENABLE_MV_DEFAULT
+      ));
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_ENABLE_MV,
+          CarbonCommonConstants.CARBON_ENABLE_MV_DEFAULT);
+    }
+  }
+
   private void validateEnableUnsafeSort() {
     String unSafeSortStr = carbonProperties.getProperty(ENABLE_UNSAFE_SORT);
     if (unSafeSortStr == null) {
@@ -1763,6 +1777,19 @@ public final class CarbonProperties {
   }
 
   /**
+   * Check whether the MV is enabled by the user or not.
+   */
+  public boolean isMVEnabled() {
+    String mvEnabled = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_ENABLE_MV);
+    if (mvEnabled == null || !CarbonUtil.validateBoolean(mvEnabled)) {
+      return Boolean.parseBoolean(CarbonCommonConstants.CARBON_ENABLE_MV_DEFAULT);
+    } else {
+      return mvEnabled.equalsIgnoreCase("true");
+    }
+  }
+
+  /**
    * Check if user has enabled/disabled the use of pre-priming for index server
    */
   public boolean isIndexServerPrePrimingEnabled() {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 155c543..36508c4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -31,11 +31,11 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_MV;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.DISABLE_SQL_REWRITE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING;
@@ -155,7 +155,7 @@ public class SessionParams implements Serializable, Cloneable {
       case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR:
       case CARBON_ENABLE_INDEX_SERVER:
       case CARBON_QUERY_STAGE_INPUT:
-      case DISABLE_SQL_REWRITE:
+      case CARBON_ENABLE_MV:
         isValid = CarbonUtil.validateBoolean(value);
         if (!isValid) {
           throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
index fbc48e9..2778f10 100644
--- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -66,6 +66,22 @@ public class CarbonPropertiesValidationTest extends TestCase {
         CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT.equalsIgnoreCase(valueAfterValidation));
   }
 
+  @Test public void testValidateEnableMV()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateEnableMV");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.CARBON_ENABLE_MV, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_ENABLE_MV);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_ENABLE_MV);
+    assertTrue(valueBeforeValidation.equals("xyz"));
+    assertTrue(
+        CarbonCommonConstants.CARBON_ENABLE_MV_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+  }
+
   @Test public void testValidateEnableOffHeapSort()
       throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
     Method validateMethodType =
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index a1baaaf..7e4e153 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -128,6 +128,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.scheduler.min.registered.resources.ratio | 0.8 | Specifies the minimum resource (executor) ratio needed for starting the block distribution. The default value is 0.8, which indicates 80% of the requested resource is allocated for starting block distribution. The minimum value is 0.1 min and the maximum value is 1.0. |
 | carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12 [...]
 | carbon.enable.vector.reader | true | Spark added vector processing to optimize cpu cache miss and there by increase the query performance. This configuration enables to fetch data as columnar batch of size 4*1024 rows instead of fetching data row by row and provide it to spark so that there is improvement in  select queries performance. |
+| carbon.enable.mv | true | Whether to rewrite the query plan based on the materialized views, Default value is true |
 | carbon.task.distribution | block | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. Each of these task distribution suggestions has its own advantages and disadvantages. Based on the customer use case, appropriate task distribution can be configured.**block**: Setting this value will launch one task per block. This setting is suggested in case of  [...]
 | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster) [...]
 | enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files  [...]
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index 430d2fd..8bd4e86 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.view.MVManager
 
 class MVManagerInSpark(session: SparkSession) extends MVManager {
   override def getDatabases: util.List[String] = {
-    CarbonUtils.threadSet(CarbonCommonConstants.DISABLE_SQL_REWRITE, "true")
+    CarbonUtils.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true")
     try {
       val databaseList = session.catalog.listDatabases()
       val databaseNameList = new util.ArrayList[String]()
@@ -37,7 +37,7 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
       }
       databaseNameList
     } finally {
-      CarbonUtils.threadUnset(CarbonCommonConstants.DISABLE_SQL_REWRITE)
+      CarbonUtils.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV)
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 8244848..e50f575 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -187,7 +187,7 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
         LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
         throw new AnalysisException(preEventEx.getMessage)
       case ex: Exception =>
-        LOGGER.error(ex)
+        LOGGER.error(ex.getMessage, ex)
         // update the load entry in table status file for changing the status to marked for delete
         if (isUpdateTableStatusRequired) {
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index f475a2b..db16767 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory}
 import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
 import org.apache.carbondata.view.{MVCatalogInSpark, MVManagerInSpark, MVSchemaWrapper}
@@ -98,14 +98,8 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     if (!canApply) {
       return logicalPlan
     }
-    val sessionInformation = ThreadLocalSessionInfo.getCarbonSessionInfo
-    if (sessionInformation != null && sessionInformation.getThreadParams != null) {
-      val disableViewRewrite = sessionInformation.getThreadParams.getProperty(
-        CarbonCommonConstants.DISABLE_SQL_REWRITE)
-      if (disableViewRewrite != null &&
-        disableViewRewrite.equalsIgnoreCase("true")) {
-        return logicalPlan
-      }
+    if (!CarbonProperties.getInstance().isMVEnabled) {
+      return logicalPlan
     }
     // when first time MVCatalogs are initialized, it stores session info also,
     // but when carbon session is newly created, catalog map will not be cleared,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
index 98c0130..bf9ac6b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class MVTest extends QueryTest with BeforeAndAfterAll {
@@ -63,6 +62,52 @@ class MVTest extends QueryTest with BeforeAndAfterAll {
     sql("drop table source")
   }
 
+  test("test disable mv with carbonproperties and sessionparam") {
+    //1. Prepare the source table and MV, make sure the MV is enabled
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source as select * from fact_table")
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    //2.  test disable mv with carbon.properties
+    // 2.1 disable MV when set carbon.enable.mv = false in the carbonproperties
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"false")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(!isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    // 2.2 enable MV when configuared value is invalid
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"invalidvalue")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    // 2.3 enable mv when set carbon.enable.mv = true in the carbonproperties
+    df = sql("select empname, avg(salary) from source group by empname")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"true")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    //3.  test disable mv with sessionparam
+    // 3.1 disable MV when set carbon.enable.mv = false in the sessionparam
+    sql("set carbon.enable.mv = false")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(!isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    // 3.2 validate configuared sessionparam
+    val exMessage = intercept[Exception] {
+      sql("set carbon.enable.mv = invalidvalue")
+    }
+    assert(exMessage.getMessage.contains("Invalid value invalidvalue for key carbon.enable.mv"))
+
+    // 3.3 enable mv when set carbon.enable.mv = true in the sessionparam
+    sql("set carbon.enable.mv = true")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+
+    ThreadLocalSessionInfo.getCarbonSessionInfo.
+      getSessionParams.removeProperty(CarbonCommonConstants.CARBON_ENABLE_MV)
+  }
+
   test("test create mv on orc table") {
     sql("drop materialized view if exists mv1")
     sql("drop table if exists source")