You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 16:02:20 UTC

carbondata git commit: [CARBONDATA-2101]Restrict direct query on pre aggregate and timeseries datamap

Repository: carbondata
Updated Branches:
  refs/heads/master 46d9bf966 -> 349be007f


[CARBONDATA-2101]Restrict direct query on pre aggregate and timeseries datamap

Restricting direct query on PreAggregate and timeseries data map
Added Property to run direct query on data map for testing purpose
validate.support.direct.query.on.datamap=true

This closes #1888


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/349be007
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/349be007
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/349be007

Branch: refs/heads/master
Commit: 349be007fd20fb8c4a39b318e45b47445d2e798c
Parents: 46d9bf9
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Jan 30 20:54:12 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Feb 3 21:32:08 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 10 +++++
 .../carbondata/core/util/SessionParams.java     |  2 +
 .../spark/sql/common/util/QueryTest.scala       |  4 ++
 .../apache/spark/sql/test/util/QueryTest.scala  |  3 ++
 .../spark/rdd/AggregateDataMapCompactor.scala   |  2 +
 .../sql/CarbonDatasourceHadoopRelation.scala    |  1 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 18 +++++++++
 .../preaaggregate/PreAggregateUtil.scala        |  2 +
 .../sql/hive/CarbonPreAggregateRules.scala      |  9 +++++
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 40 +++++++++++++++++++-
 10 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a799e51..6e6482d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1588,6 +1588,16 @@ public final class CarbonCommonConstants {
       "carbon.sort.storage.inmemory.size.inmb";
   public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT = "512";
 
+  @CarbonProperty
+  public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP =
+      "carbon.query.directQueryOnDataMap.enabled";
+  public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false";
+
+  @CarbonProperty
+  public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP =
+      "carbon.query.validate.directqueryondatamap";
+  public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index ddc7539..a6ff61e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -199,6 +199,8 @@ public class SessionParams implements Serializable {
           }
         } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
           isValid = true;
+        } else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index d80efb8..9c5bc38 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
 import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
 import org.scalatest.Suite
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 
 class QueryTest extends PlanTest with Suite {
 
@@ -43,6 +45,8 @@ class QueryTest extends PlanTest with Suite {
 
   // Add Locale setting
   Locale.setDefault(Locale.US)
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
 
   /**
    * Runs the plan and makes sure the answer contains all of the keywords, or the

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index b87473a..6e5630a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -41,6 +41,9 @@ class QueryTest extends PlanTest {
   // Add Locale setting
   Locale.setDefault(Locale.US)
 
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
+
   /**
    * Runs the plan and makes sure the answer contains all of the keywords, or the
    * none of keywords are listed in the answer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 188e776..c8a6b1d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -70,6 +70,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         loadCommand.dataFrame =
                   Some(PreAggregateUtil.getDataFrame(
                     sqlContext.sparkSession, loadCommand.logicalPlan.get))
+        CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+          "true")
         loadCommand.processData(sqlContext.sparkSession)
         val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
           carbonTable.getMetaDataFilepath, uuid)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 39a0d1e..0978fab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -74,6 +74,7 @@ case class CarbonDatasourceHadoopRelation(
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
+    CarbonSession.threadUnset(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(
       sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6b12008..8444d25 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -265,4 +265,22 @@ object CarbonEnv {
       tableName)
   }
 
+  def getThreadParam(key: String, defaultValue: String) : String = {
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (null != carbonSessionInfo) {
+      carbonSessionInfo.getThreadParams.getProperty(key, defaultValue)
+    } else {
+      defaultValue
+    }
+  }
+
+  def getSessionParam(key: String, defaultValue: String) : String = {
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (null != carbonSessionInfo) {
+      carbonSessionInfo.getThreadParams.getProperty(key, defaultValue)
+    } else {
+      defaultValue
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1d4ebec..ae9bc9b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -599,6 +599,8 @@ object PreAggregateUtil {
       CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
       parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
       parentTableIdentifier.table, validateSegments.toString)
+    CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+      "true")
     CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
     try {
       loadCommand.processData(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index de58805..7b4bc0d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -259,6 +259,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * @return transformed plan
    */
   def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    var isPlanUpdated = false
     val updatedPlan = logicalPlan.transform {
       case agg@Aggregate(
         grExp,
@@ -294,6 +295,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   childPlan,
                   carbonTable,
                   agg)
+              isPlanUpdated = true
               Aggregate(updatedGroupExp,
                 updatedAggExp,
                 CarbonReflectionUtils
@@ -346,6 +348,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   childPlan,
                   carbonTable,
                   agg)
+              isPlanUpdated = true
               Aggregate(updatedGroupExp,
                 updatedAggExp,
                 CarbonReflectionUtils
@@ -401,6 +404,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   childPlan,
                   carbonTable,
                   agg)
+              isPlanUpdated = true
               Aggregate(updatedGroupExp,
                 updatedAggExp,
                 Filter(updatedFilterExpression.get,
@@ -461,6 +465,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   childPlan,
                   carbonTable,
                   agg)
+              isPlanUpdated = true
               Aggregate(updatedGroupExp,
                 updatedAggExp,
                 Filter(updatedFilterExpression.get,
@@ -481,6 +486,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
 
     }
+    if(isPlanUpdated) {
+      CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+        "true")
+    }
     updatedPlan
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 06ad0ad..0aa7514 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.stats.QueryStatistic
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 
@@ -66,7 +66,8 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
   }
 
   def checkIfRuleNeedToBeApplied(plan: LogicalPlan, removeSubQuery: Boolean = false): Boolean = {
-    relations = collectCarbonRelation(plan);
+    relations = collectCarbonRelation(plan)
+    validateQueryDirectlyOnDataMap(relations)
     if (relations.nonEmpty && !isOptimized(plan)) {
       // In case scalar subquery skip the transformation and update the flag.
       if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
@@ -87,6 +88,41 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
+  /**
+   * Below method will be used to validate if query is directly fired on pre aggregate
+   * data map or not
+   * @param relations all relations from query
+   *
+   */
+  def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = {
+    var isPreAggDataMapExists = false
+    // first check if pre aggregate data map exists or not
+    relations.foreach{relation =>
+      if (relation.carbonRelation.carbonTable.isChildDataMap) {
+        isPreAggDataMapExists = true
+      }
+    }
+    val validateQuery = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP,
+        CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
+    var isThrowException = false
+    // if validate query is enabled and relation contains pre aggregate data map
+    if (validateQuery && isPreAggDataMapExists) {
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (null != carbonSessionInfo) {
+        val supportQueryOnDataMap = CarbonEnv.getThreadParam(
+          CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+            CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
+        if (!supportQueryOnDataMap) {
+          isThrowException = true
+        }
+      }
+    }
+    if(isThrowException) {
+      throw new AnalysisException("Query On DataMap not supported")
+    }
+  }
+
   private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
     plan collect {
       case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>