You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/09 18:26:01 UTC
[17/47] carbondata git commit: [HOTFIX][PR 2575] Fixed modular plan
creation only if valid datamaps are available
[HOTFIX][PR 2575] Fixed modular plan creation only if valid datamaps are available
update query is failing in spark-2.2 cluster if mv jars are available because catalogs
are not empty if datamap are created for other table also and returns true from isValidPlan() inside MVAnalyzerRule.
This closes #2579
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/738f34b6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/738f34b6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/738f34b6
Branch: refs/heads/branch-1.4
Commit: 738f34b681f777bf2f7b773361be794122cf1302
Parents: 4a11ca6
Author: ravipesala <ra...@gmail.com>
Authored: Mon Jul 30 15:00:00 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 9 23:42:44 2018 +0530
----------------------------------------------------------------------
.../carbondata/core/datamap/DataMapCatalog.java | 4 +-
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 57 ++++++++++++++++----
.../mv/rewrite/SummaryDatasetCatalog.scala | 9 +++-
.../mv/rewrite/MVCreateTestCase.scala | 4 ++
4 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/738f34b6/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
index 89f2838..5dd4871 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
@@ -38,10 +38,10 @@ public interface DataMapCatalog<T> {
void unregisterSchema(String dataMapName);
/**
- * List all registered schema catalogs
+ * List all registered valid schema catalogs
* @return
*/
- T[] listAllSchema();
+ T[] listAllValidSchema();
/**
* It reloads/removes all registered schema catalogs
http://git-wip-us.apache.org/repos/asf/carbondata/blob/738f34b6/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index 483780f..9e0f8e5 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -16,8 +16,11 @@
*/
package org.apache.carbondata.mv.datamap
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -79,27 +82,59 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
}
+ /**
+ * Whether the plan is valid for doing modular plan matching and datamap replacing.
+ */
def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = {
- !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema()) &&
- !plan.isInstanceOf[DeserializeToObject]
+ if (!plan.isInstanceOf[Command] && !plan.isInstanceOf[DeserializeToObject]) {
+ val catalogs = extractCatalogs(plan)
+ !isDataMapReplaced(catalog.listAllValidSchema(), catalogs) &&
+ isDataMapExists(catalog.listAllValidSchema(), catalogs)
+ } else {
+ false
+ }
+
}
/**
* Check whether datamap table already updated in the query.
*
- * @param plan
- * @param mvs
- * @return
+ * @param mvdataSetArray Array of available mvdataset which include modular plans
+ * @return Boolean whether already datamap replaced in the plan or not
*/
- def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = {
- val catalogs = plan collect {
- case l: LogicalRelation => l.catalogTable
- }
- catalogs.isEmpty || catalogs.exists { c =>
- mvs.exists { mv =>
+ def isDataMapReplaced(
+ mvdataSetArray: Array[SummaryDataset],
+ catalogs: Seq[Option[CatalogTable]]): Boolean = {
+ catalogs.exists { c =>
+ mvdataSetArray.exists { mv =>
val identifier = mv.dataMapSchema.getRelationIdentifier
identifier.getTableName.equals(c.get.identifier.table) &&
identifier.getDatabaseName.equals(c.get.database)
}
}
}
+
+ /**
+ * Check whether any suitable datamaps(like datamap which parent tables are present in the plan)
+ * exists for this plan.
+ *
+ * @param mvs
+ * @return
+ */
+ def isDataMapExists(mvs: Array[SummaryDataset], catalogs: Seq[Option[CatalogTable]]): Boolean = {
+ catalogs.exists { c =>
+ mvs.exists { mv =>
+ mv.dataMapSchema.getParentTables.asScala.exists { identifier =>
+ identifier.getTableName.equals(c.get.identifier.table) &&
+ identifier.getDatabaseName.equals(c.get.database)
+ }
+ }
+ }
+ }
+
+ private def extractCatalogs(plan: LogicalPlan): Seq[Option[CatalogTable]] = {
+ val catalogs = plan collect {
+ case l: LogicalRelation => l.catalogTable
+ }
+ catalogs
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/738f34b6/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 210ff65..026d6b7 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -152,7 +152,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
}
- override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray
+ override def listAllValidSchema(): Array[SummaryDataset] = {
+ val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails
+ // Only select the enabled datamaps for the query.
+ val enabledDataSets = summaryDatasets.filter { p =>
+ statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName))
+ }
+ enabledDataSets.toArray
+ }
/**
* API for test only
http://git-wip-us.apache.org/repos/asf/carbondata/blob/738f34b6/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 0b96202..9f834a9 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -889,7 +889,10 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("basic scenario") {
sql("drop table if exists mvtable1")
+ sql("drop table if exists mvtable2")
sql("create table mvtable1(name string,age int,salary int) stored by 'carbondata'")
+ sql("create table mvtable2(name string,age int,salary int) stored by 'carbondata'")
+ sql("create datamap MV11 using 'mv' as select name from mvtable2")
sql(" insert into mvtable1 select 'n1',12,12")
sql(" insert into mvtable1 select 'n1',12,12")
sql(" insert into mvtable1 select 'n3',12,12")
@@ -897,6 +900,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("update mvtable1 set(name) = ('updatedName')").show()
checkAnswer(sql("select count(*) from mvtable1 where name = 'updatedName'"),Seq(Row(4)))
sql("drop table if exists mvtable1")
+ sql("drop table if exists mvtable2")
}
def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {