You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/22 16:44:46 UTC
[04/50] [abbrv] carbondata git commit: [CARBONDATA-2475] Support
Modular Core for Materialized View DataMap for query matching and rewriting
[CARBONDATA-2475] Support Modular Core for Materialized View DataMap for query matching and rewriting
Support Modular Core for Materialized View DataMap
This closes #2302
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bf73e9fe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bf73e9fe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bf73e9fe
Branch: refs/heads/branch-1.4
Commit: bf73e9fe77523e23be46e7597e2c990e855401e5
Parents: d14c403
Author: ravipesala <ra...@gmail.com>
Authored: Sat May 12 22:49:19 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sun May 13 17:08:19 2018 +0800
----------------------------------------------------------------------
datamap/mv/core/pom.xml | 169 ++
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 105 +
.../mv/datamap/MVDataMapProvider.scala | 125 +
.../apache/carbondata/mv/datamap/MVHelper.scala | 377 +++
.../apache/carbondata/mv/datamap/MVState.scala | 55 +
.../mv/rewrite/DefaultMatchMaker.scala | 647 +++++
.../carbondata/mv/rewrite/MatchConditions.scala | 28 +
.../carbondata/mv/rewrite/MatchMaker.scala | 47 +
.../carbondata/mv/rewrite/Navigator.scala | 196 ++
.../carbondata/mv/rewrite/QueryRewrite.scala | 53 +
.../mv/rewrite/SummaryDatasetCatalog.scala | 168 ++
.../apache/carbondata/mv/rewrite/Utils.scala | 358 +++
.../mv/rewrite/MVCreateTestCase.scala | 676 +++++
.../mv/rewrite/MVSampleTestCase.scala | 167 ++
.../carbondata/mv/rewrite/MVTPCDSTestCase.scala | 146 +
.../carbondata/mv/rewrite/MVTpchTestCase.scala | 247 ++
.../SelectSelectExactChildrenSuite.scala | 76 +
.../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 80 +
.../mv/rewrite/matching/TestSQLBatch.scala | 214 ++
.../rewrite/matching/TestTPCDS_1_4_Batch.scala | 2496 ++++++++++++++++++
20 files changed, 6430 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
new file mode 100644
index 0000000..99a8e22
--- /dev/null
+++ b/datamap/mv/core/pom.xml
@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-mv-core</artifactId>
+ <name>Apache CarbonData :: Materialized View Core</name>
+
+ <properties>
+ <dev.path>${basedir}/../../../dev</dev.path>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-mv-plan</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <!-- Note config is repeated in scalatest config -->
+ <configuration>
+ <skip>false</skip>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ </systemProperties>
+ <testFailureIgnore>false</testFailureIgnore>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.4.1</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.ning.maven.plugins</groupId>
+ <artifactId>maven-duplicate-finder-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <testFailureIgnore>false</testFailureIgnore>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>sdvtest</id>
+ <properties>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
new file mode 100644
index 0000000..4e93f15
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
+
+/**
+ * Analyzer rule to rewrite the query for MV datamap
+ *
+ * @param sparkSession
+ */
+class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+
+ // TODO Find way better way to get the provider.
+ private val dataMapProvider =
+ DataMapManager.get().getDataMapProvider(null,
+ new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
+
+ private val LOGGER = LogServiceFactory.getLogService(classOf[MVAnalyzerRule].getName)
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ var needAnalysis = true
+ plan.transformAllExpressions {
+ // first check if any preAgg scala function is applied it is present is in plan
+ // then call is from create preaggregate table class so no need to transform the query plan
+ // TODO Add different UDF name
+ case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
+ needAnalysis = false
+ al
+ // in case of query if any unresolve alias is present then wait for plan to be resolved
+ // return the same plan as we can tranform the plan only when everything is resolved
+ case unresolveAlias@UnresolvedAlias(_, _) =>
+ needAnalysis = false
+ unresolveAlias
+ case attr@UnresolvedAttribute(_) =>
+ needAnalysis = false
+ attr
+ }
+ val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+ DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
+ val modularPlan = catalog.mVState.rewritePlan(plan).withSummaryData
+ if (modularPlan.find (_.rewritten).isDefined) {
+ val compactSQL = modularPlan.asCompactSQL
+ LOGGER.audit(s"\n$compactSQL\n")
+ val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
+ analyzed
+ } else {
+ plan
+ }
+ } else {
+ plan
+ }
+ }
+
+ def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = {
+ !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema()) &&
+ !plan.isInstanceOf[DeserializeToObject]
+ }
+ /**
+ * Check whether datamap table already updated in the query.
+ *
+ * @param plan
+ * @param mvs
+ * @return
+ */
+ def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = {
+ val catalogs = plan collect {
+ case l: LogicalRelation => l.catalogTable
+ }
+ catalogs.isEmpty || catalogs.exists { c =>
+ mvs.exists { mv =>
+ val identifier = mv.dataMapSchema.getRelationIdentifier
+ identifier.getTableName.equals(c.get.identifier.table) &&
+ identifier.getDatabaseName.equals(c.get.database)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
new file mode 100644
index 0000000..2aba23e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import java.io.IOException
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.spark.sql.execution.datasources.FindDataSourceTable
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
+
+@InterfaceAudience.Internal
+class MVDataMapProvider(
+ mainTable: CarbonTable,
+ sparkSession: SparkSession,
+ dataMapSchema: DataMapSchema)
+ extends DataMapProvider(mainTable, dataMapSchema) {
+ protected var dropTableCommand: CarbonDropTableCommand = null
+
+ @throws[MalformedDataMapCommandException]
+ @throws[IOException]
+ override def initMeta(ctasSqlStatement: String): Unit = {
+ if (ctasSqlStatement == null) {
+ throw new MalformedDataMapCommandException(
+ "select statement is mandatory")
+ }
+ MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true)
+ DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+ }
+
+ override def initData(): Unit = {
+ }
+
+ @throws[IOException]
+ override def cleanMeta(): Unit = {
+ dropTableCommand = new CarbonDropTableCommand(true,
+ new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.getRelationIdentifier.getTableName,
+ true)
+ dropTableCommand.processMetadata(sparkSession)
+ DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+ DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+ }
+
+ override def cleanData(): Unit = {
+ if (dropTableCommand != null) {
+ dropTableCommand.processData(sparkSession)
+ }
+ }
+
+ @throws[IOException]
+ override def rebuild(): Unit = {
+ val ctasQuery = dataMapSchema.getCtasQuery
+ if (ctasQuery != null) {
+ val identifier = dataMapSchema.getRelationIdentifier
+ val logicalPlan =
+ new FindDataSourceTable(sparkSession).apply(
+ sparkSession.sessionState.catalog.lookupRelation(
+ TableIdentifier(identifier.getTableName,
+ Some(identifier.getDatabaseName)))) match {
+ case s: SubqueryAlias => s.child
+ case other => other
+ }
+ val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(ctasQuery)
+ val queryPlan = SparkSQLUtil.execute(
+ sparkSession.sql(updatedQuery).queryExecution.analyzed,
+ sparkSession).drop("preAgg")
+ val header = logicalPlan.output.map(_.name).mkString(",")
+ val loadCommand = CarbonLoadDataCommand(
+ databaseNameOp = Some(identifier.getDatabaseName),
+ tableName = identifier.getTableName,
+ factPathFromUser = null,
+ dimFilesPath = Seq(),
+ options = scala.collection.immutable.Map("fileheader" -> header),
+ isOverwriteTable = true,
+ inputSqlString = null,
+ dataFrame = Some(queryPlan),
+ updateModel = None,
+ tableInfoOp = None,
+ internalOptions = Map.empty,
+ partition = Map.empty)
+
+ SparkSQLUtil.execute(loadCommand, sparkSession)
+ }
+ }
+
+ @throws[IOException]
+ override def incrementalBuild(
+ segmentIds: Array[String]): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def createDataMapCatalog : DataMapCatalog[SummaryDataset] =
+ new SummaryDatasetCatalog(sparkSession)
+
+ override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = {
+ throw new UnsupportedOperationException
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
new file mode 100644
index 0000000..0f9362f
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, DataMapSchemaStorageProvider, RelationIdentifier}
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, QueryRewrite}
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility for MV datamap operations.
+ */
+object MVHelper {
+
+ def createMVDataMap(sparkSession: SparkSession,
+ dataMapSchema: DataMapSchema,
+ queryString: String,
+ ifNotExistsSet: Boolean = false): Unit = {
+ val dmProperties = dataMapSchema.getProperties.asScala
+ val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+ val logicalPlan = sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed
+ val fields = logicalPlan.output.map { attr =>
+ val name = updateColumnName(attr)
+ val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
+ if (attr.dataType.typeName.startsWith("decimal")) {
+ val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
+ Field(column = name,
+ dataType = Some(attr.dataType.typeName),
+ name = Some(name),
+ children = None,
+ precision = precision,
+ scale = scale,
+ rawSchema = rawSchema)
+ } else {
+ Field(column = name,
+ dataType = Some(attr.dataType.typeName),
+ name = Some(name),
+ children = None,
+ rawSchema = rawSchema)
+ }
+ }
+ val tableProperties = mutable.Map[String, String]()
+ dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+ val selectTables = getTables(logicalPlan)
+
+ // TODO inherit the table properties like sort order, sort scope and block size from parent
+ // tables to mv datamap table
+ // TODO Use a proper DB
+ val tableIdentifier =
+ TableIdentifier(dataMapSchema.getDataMapName + "_table",
+ selectTables.head.identifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+ ifNotExistPresent = ifNotExistsSet,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ isAlterFlow = false,
+ None)
+
+ val tablePath = if (dmProperties.contains("path")) {
+ dmProperties("path")
+ } else {
+ CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+ }
+ CarbonCreateTableCommand(TableNewProcessor(tableModel),
+ tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
+
+ dataMapSchema.setCtasQuery(queryString)
+ dataMapSchema
+ .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
+ tableIdentifier.table,
+ ""))
+
+ val parentIdents = selectTables.map { table =>
+ new RelationIdentifier(table.database, table.identifier.table, "")
+ }
+ dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
+ DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+ }
+
+ def updateColumnName(attr: Attribute): String = {
+ val name = attr.name.replace("(", "_").replace(")", "").replace(" ", "_").replace("=", "")
+ attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
+ }
+
+ def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
+ logicalPlan.collect {
+ case l: LogicalRelation => l.catalogTable.get
+ }
+ }
+
+ def dropDummFuc(plan: LogicalPlan): LogicalPlan = {
+ plan transform {
+ case p@Project(exps, child) =>
+ Project(dropDummyExp(exps), child)
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(
+ grp,
+ dropDummyExp(aggExp),
+ child)
+ }
+ }
+
+ private def dropDummyExp(exps: Seq[NamedExpression]) = {
+ exps.map {
+ case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") => None
+ case attr: AttributeReference if attr.name.equalsIgnoreCase("preAgg") => None
+ case other => Some(other)
+ }.filter(_.isDefined).map(_.get)
+ }
+
+ def getAttributeMap(subsumer: Seq[NamedExpression],
+ subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
+ if (subsumer.length == subsume.length) {
+ subsume.zip(subsumer).flatMap { case (left, right) =>
+ var tuples = left collect {
+ case attr: AttributeReference =>
+ (AttributeKey(attr), createAttrReference(right, attr.name))
+ }
+ left match {
+ case a: Alias =>
+ tuples = Seq((AttributeKey(a.child), createAttrReference(right, a.name))) ++ tuples
+ case _ =>
+ }
+ Seq((AttributeKey(left), createAttrReference(right, left.name))) ++ tuples
+ }.toMap
+ } else {
+ throw new UnsupportedOperationException("Cannot create mapping with unequal sizes")
+ }
+ }
+
+ def createAttrReference(ref: NamedExpression, name: String): Alias = {
+ Alias(ref, name)(exprId = ref.exprId, qualifier = None)
+ }
+
+ case class AttributeKey(exp: Expression) {
+
+ override def equals(other: Any): Boolean = other match {
+ case attrKey: AttributeKey =>
+ exp.semanticEquals(attrKey.exp)
+ case _ => false
+ }
+
+ override def hashCode: Int = exp.hashCode
+
+ }
+
+ /**
+ * Updates the expressions as per the subsumer output expressions. It is needed to update the
+ * expressions as per the datamap table relation
+ *
+ * @param expressions expressions which are needed to update
+ * @param aliasName table alias name
+ * @return Updated expressions
+ */
+ def updateSubsumeAttrs(
+ expressions: Seq[Expression],
+ attrMap: Map[AttributeKey, NamedExpression],
+ aliasName: Option[String],
+ keepAlias: Boolean = false): Seq[Expression] = {
+
+ def getAttribute(exp: Expression) = {
+ exp match {
+ case Alias(agg: AggregateExpression, name) =>
+ agg.aggregateFunction.collect {
+ case attr: AttributeReference =>
+ AttributeReference(attr.name, attr.dataType, attr.nullable, attr
+ .metadata)(attr.exprId,
+ aliasName,
+ attr.isGenerated)
+ }.head
+ case Alias(child, name) =>
+ child
+ case other => other
+ }
+ }
+
+ expressions.map {
+ case alias@Alias(agg: AggregateExpression, name) =>
+ attrMap.get(AttributeKey(alias)).map { exp =>
+ Alias(getAttribute(exp), name)(alias.exprId,
+ alias.qualifier,
+ alias.explicitMetadata,
+ alias.isGenerated)
+ }.getOrElse(alias)
+
+ case attr: AttributeReference =>
+ val uattr = attrMap.get(AttributeKey(attr)).map{a =>
+ if (keepAlias) {
+ AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId,
+ attr.qualifier,
+ a.isGenerated)
+ } else {
+ a
+ }
+ }.getOrElse(attr)
+ uattr
+ case expression: Expression =>
+ val uattr = attrMap.getOrElse(AttributeKey(expression), expression)
+ uattr
+ }
+ }
+
+ def updateOutPutList(
+ subsumerOutputList: Seq[NamedExpression],
+ dataMapRltn: Select,
+ aliasMap: Map[AttributeKey, NamedExpression],
+ keepAlias: Boolean): Seq[NamedExpression] = {
+ var outputSel =
+ updateSubsumeAttrs(
+ subsumerOutputList,
+ aliasMap,
+ Some(dataMapRltn.aliasMap.values.head),
+ keepAlias).asInstanceOf[Seq[NamedExpression]]
+ outputSel.zip(subsumerOutputList).map{ case (l, r) =>
+ l match {
+ case attr: AttributeReference =>
+ Alias(attr, r.name)(r.exprId, None)
+ case a@Alias(attr: AttributeReference, name) =>
+ Alias(attr, r.name)(r.exprId, None)
+ case other => other
+ }
+ }
+
+ }
+
+ def updateSelectPredicates(
+ predicates: Seq[Expression],
+ attrMap: Map[AttributeKey, NamedExpression],
+ keepAlias: Boolean): Seq[Expression] = {
+ predicates.map { exp =>
+ exp transform {
+ case attr: AttributeReference =>
+ val uattr = attrMap.get(AttributeKey(attr)).map{a =>
+ if (keepAlias) {
+ AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId,
+ attr.qualifier,
+ a.isGenerated)
+ } else {
+ a
+ }
+ }.getOrElse(attr)
+ uattr
+ }
+ }
+ }
+
+ /**
+ * Update the modular plan as per the datamap table relation inside it.
+ *
+ * @param subsumer plan to be updated
+ * @return Updated modular plan.
+ */
+ def updateDataMap(subsumer: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+ subsumer match {
+ case s: Select if s.dataMapTableRelation.isDefined =>
+ val relation = s.dataMapTableRelation.get.asInstanceOf[Select]
+ val mappings = s.outputList zip relation.outputList
+ val oList = for ((o1, o2) <- mappings) yield {
+ if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+ }
+ relation.copy(outputList = oList).setRewritten()
+ case g: GroupBy if g.dataMapTableRelation.isDefined =>
+ val relation = g.dataMapTableRelation.get.asInstanceOf[Select]
+ val in = relation.asInstanceOf[Select].outputList
+ val mappings = g.outputList zip relation.outputList
+ val oList = for ((left, right) <- mappings) yield {
+ left match {
+ case Alias(agg@AggregateExpression(fun@Sum(child), _, _, _), name) =>
+ val uFun = fun.copy(child = right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+ case Alias(agg@AggregateExpression(fun@Max(child), _, _, _), name) =>
+ val uFun = fun.copy(child = right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+ case Alias(agg@AggregateExpression(fun@Min(child), _, _, _), name) =>
+ val uFun = fun.copy(child = right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+ case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, _), name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+ case _ =>
+ if (left.name != right.name) Alias(right, left.name)(exprId = left.exprId) else right
+ }
+ }
+ val updatedPredicates = g.predicateList.map { f =>
+ mappings.find{ case (k, y) =>
+ k match {
+ case a: Alias if f.isInstanceOf[Alias] =>
+ a.child.semanticEquals(f.children.head)
+ case a: Alias => a.child.semanticEquals(f)
+ case other => other.semanticEquals(f)
+ }
+ } match {
+ case Some(r) => r._2
+ case _ => f
+ }
+ }
+ g.copy(outputList = oList,
+ inputList = in,
+ predicateList = updatedPredicates,
+ child = relation,
+ dataMapTableRelation = None).setRewritten()
+
+ case select: Select =>
+ select.children match {
+ case Seq(s: Select) if s.dataMapTableRelation.isDefined =>
+ val relation = s.dataMapTableRelation.get.asInstanceOf[Select]
+ val child = updateDataMap(s, rewrite).asInstanceOf[Select]
+ val aliasMap = getAttributeMap(relation.outputList, s.outputList)
+ var outputSel =
+ updateOutPutList(select.outputList, relation, aliasMap, keepAlias = true)
+ val pred = updateSelectPredicates(select.predicateList, aliasMap, true)
+ select.copy(outputList = outputSel,
+ inputList = child.outputList,
+ predicateList = pred,
+ children = Seq(child)).setRewritten()
+
+ case Seq(g: GroupBy) if g.dataMapTableRelation.isDefined =>
+ val relation = g.dataMapTableRelation.get.asInstanceOf[Select]
+ val aliasMap = getAttributeMap(relation.outputList, g.outputList)
+
+ val outputSel =
+ updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false)
+ val child = updateDataMap(g, rewrite).asInstanceOf[Matchable]
+ // TODO Remove the unnecessary columns from selection.
+ // Only keep columns which are required by parent.
+ val inputSel = child.outputList
+ select.copy(
+ outputList = outputSel,
+ inputList = inputSel,
+ children = Seq(child)).setRewritten()
+
+ case _ => select
+ }
+
+ case other => other
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
new file mode 100644
index 0000000..412d547
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.datamap
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.SimpleModularizer
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, Navigator, QueryRewrite, SummaryDatasetCatalog}
+
+/**
+ * A class that holds all session-specific state.
+ */
+private[mv] class MVState(summaryDatasetCatalog: SummaryDatasetCatalog) {
+
+ // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+ // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+ /**
+ * Modular query plan modularizer
+ */
+ lazy val modularizer = SimpleModularizer
+
+ /**
+ * Logical query plan optimizer.
+ */
+ lazy val optimizer = BirdcageOptimizer
+
+ lazy val matcher = DefaultMatchMaker
+
+ lazy val navigator: Navigator = new Navigator(summaryDatasetCatalog, this)
+
+ /**
+ * Rewrite the logical query plan to MV plan if applicable.
+ * @param plan
+ * @return
+ */
+ def rewritePlan(plan: LogicalPlan): QueryRewrite = new QueryRewrite(this, plan)
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
new file mode 100644
index 0000000..899c36c
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+
+import org.apache.carbondata.mv.datamap.MVHelper
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular._
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuilder
+
+abstract class DefaultMatchMaker extends MatchMaker[ModularPlan]
+
+abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
+
+ /** Name for this pattern, automatically inferred based on class name. */
+ val patternName: String = {
+ val className = getClass.getName
+ if (className endsWith "$") className.dropRight(1) else className
+ }
+
+ def factorOutSubsumer(
+ compensation: ModularPlan,
+ subsumer: Matchable,
+ aliasMapMain: Map[Int, String]): ModularPlan = {
+
+ // Create aliasMap with attribute to alias reference attribute
+ val aliasMap = AttributeMap(
+ subsumer.outputList.collect {
+ case a: Alias if a.child.isInstanceOf[Attribute] =>
+ (a.child.asInstanceOf[Attribute], a.toAttribute)
+ })
+
+ // Check and replace all alias references with subsumer alias map references.
+ val compensation1 = compensation.transform {
+ case plan if !plan.skip && plan != subsumer =>
+ plan.transformExpressions {
+ case a: AttributeReference =>
+ aliasMap
+ .get(a)
+ .map { ref =>
+ AttributeReference(
+ ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = a.qualifier)
+ }.getOrElse(a)
+ }
+ }
+
+ val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
+ if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+ new UnsupportedOperationException(
+ s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
+ }
+ if (aliasMapMain.size == 1) {
+ val subsumerName: Option[String] = aliasMapMain.get(0)
+ // Replace all compensation1 attributes with refrences of subsumer attributeset
+ val compensationFinal = compensation1.transformExpressions {
+ case ref: Attribute if subqueryAttributeSet.contains(ref) =>
+ AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+ case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
+ Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+ }
+ compensationFinal
+ } else {
+ compensation1
+ }
+ }
+}
+
+object DefaultMatchMaker extends DefaultMatchMaker {
+ lazy val patterns =
+ SelectSelectNoChildDelta ::
+ GroupbyGroupbyNoChildDelta ::
+ GroupbyGroupbySelectOnlyChildDelta ::
+ GroupbyGroupbyGroupbyChildDelta ::
+ SelectSelectSelectChildDelta ::
+ SelectSelectGroupbyChildDelta :: Nil
+}
+
+/**
+ * Convention:
+ * EmR: each subsumee's expression match some of subsumer's expression
+ * EdR: each subsumee's expression derive from some of subsumer's expression
+ * RmE: each subsumer's expression match some of subsumee's expression
+ * RdE: each subsumer's expression derive from some of subsumee's expression
+ */
+
+object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]): Boolean = {
+ if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) {
+ subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) ||
+ canEvaluate(exprE, subsumer)
+ } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) {
+ exprE match {
+ case a@Alias(_, _) =>
+ exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
+ a1.asInstanceOf[Alias].child.semanticEquals(a.child)) ||
+ exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
+ case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
+ }
+ } else {
+ false
+ }
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+
+ (subsumer, subsumee, compensation) match {
+ case (
+ sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None
+ ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
+ sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+
+ // assume children (including harmonized relation) of subsumer and subsumee
+ // are 1-1 correspondence.
+ // Change the following two conditions to more complicated ones if we want to
+ // consider things that combine extrajoin, rejoin, and harmonized relations
+ val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 }
+ val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 }
+
+ val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
+ val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
+ val rejoinOutputList = rejoin.flatMap(_.output)
+
+ val isPredicateRmE = sel_1a.predicateList.forall(expr =>
+ sel_1q.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
+ isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+ val isOutputEdR = sel_1q.outputList.forall(expr =>
+ isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+
+ if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
+ isPredicateEmdR && isOutputEdR) {
+ val mappings = sel_1a.children.zipWithIndex.map {
+ case (childr, fromIdx) if sel_1q.children.contains(childr) =>
+ val toIndx = sel_1q.children.indexWhere(_ == childr)
+ (toIndx -> fromIdx)
+
+ }
+ val e2r = mappings.toMap
+ val r2e = e2r.map(_.swap)
+ val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
+ (r2e.get(x.left), r2e.get(x.right)) match {
+ case (Some(l), Some(r)) =>
+ val mappedEdge = JoinEdge(l, r, x.joinType)
+ val joinTypeEquivalent =
+ if (sel_1q.joinEdges.contains(mappedEdge)) true
+ else {
+ x.joinType match {
+ case Inner | FullOuter =>
+ sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
+ case _ => false
+ }
+ }
+ if (joinTypeEquivalent) {
+ val sel_1a_join = sel_1a.extractJoinConditions(
+ sel_1a.children(x.left),
+ sel_1a.children(x.right))
+ val sel_1q_join = sel_1q.extractJoinConditions(
+ sel_1q.children(mappedEdge.left),
+ sel_1q.children(mappedEdge.right))
+ sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) &&
+ sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_)))
+ } else false
+ case _ => false
+ }
+ }
+
+ val isPredicateEmR = sel_1q.predicateList.forall(expr =>
+ sel_1a.predicateList.exists(_.semanticEquals(expr)))
+ val isOutputEmR = sel_1q.outputList.forall(expr =>
+ sel_1a.outputList.exists(_.semanticEquals(expr)))
+ val isOutputRmE = sel_1a.outputList.forall(expr =>
+ sel_1q.outputList.exists(_.semanticEquals(expr)))
+
+ if (r2eJoinsMatch) {
+ if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty) {
+ Seq(sel_1a) // no compensation needed
+ } else {
+ val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+ val tAliasMap = new collection.mutable.HashMap[Int, String]()
+
+ val updatedOutList: Seq[NamedExpression] = updateDuplicateColumns(sel_1a)
+ val usel_1a = sel_1a.copy(outputList = updatedOutList)
+ tChildren += usel_1a
+ tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName())
+
+ sel_1q.children.zipWithIndex.foreach {
+ case (childe, idx) =>
+ if (e2r.get(idx).isEmpty) {
+ tChildren += childe
+ sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x))
+ }
+ }
+
+ val tJoinEdges = sel_1q.joinEdges.collect {
+ case JoinEdge(le, re, joinType) =>
+ (e2r.get(le), e2r.get(re)) match {
+ case (Some(lr), None) =>
+ JoinEdge(
+ 0,
+ tChildren.indexOf(sel_1q.children(re)),
+ joinType)
+ case (None, None) =>
+ JoinEdge(
+ tChildren.indexOf(sel_1q.children(le)),
+ tChildren.indexOf(sel_1q.children(re)),
+ joinType)
+ case (None, Some(rr)) =>
+ JoinEdge(
+ tChildren.indexOf(sel_1q.children(le)),
+ 0,
+ joinType)
+ case _ =>
+ null.asInstanceOf[JoinEdge]
+ }
+ }
+ val tPredicateList = sel_1q.predicateList.filter { p =>
+ !sel_1a.predicateList.exists(_.semanticEquals(p)) }
+ val wip = sel_1q.copy(
+ predicateList = tPredicateList,
+ children = tChildren,
+ joinEdges = tJoinEdges.filter(_ != null),
+ aliasMap = tAliasMap.toMap)
+
+ val done = factorOutSubsumer(wip, usel_1a, wip.aliasMap)
+ Seq(done)
+ }
+ } else Nil
+ } else Nil
+
+ case (
+ sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
+ if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
+ sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+ val isPredicateRmE = sel_3a.predicateList.isEmpty ||
+ sel_3a.predicateList.forall(expr =>
+ sel_3q.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
+ sel_3q.predicateList.forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+ val isOutputEdR = sel_3q.outputList.forall(expr =>
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+ val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
+
+ if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
+ val isPredicateEmR = sel_3q.predicateList.isEmpty ||
+ sel_3q.predicateList.forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)))
+ val isOutputRmE = sel_3a.outputList.forall(expr =>
+ isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
+ val isOutputEmR = sel_3q.outputList.forall(expr =>
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+
+ if (isPredicateEmR && isOutputEmR && isOutputRmE) {
+ Seq(sel_3a)
+ } else if (isPredicateEmR && isOutputEmR) {
+ // no compensation needed
+ val sel_3q_exp = sel_3q.transformExpressions({
+ case a: Alias => sel_3a.outputList
+ .find { a1 =>
+ a1.isInstanceOf[Alias] &&
+ a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+ }.map(_.toAttribute).get
+ })
+ val wip = sel_3q_exp.copy(
+ children = Seq(sel_3a),
+ aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+ val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap)
+ Seq(done)
+ } else {
+ Nil
+ }
+ } else Nil
+
+ case _ => Nil
+ }
+ }
+
+ private def updateDuplicateColumns(sel_1a: Select) = {
+ val duplicateNameCols = sel_1a.outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
+ .toList
+ val updatedOutList = sel_1a.outputList.map { col =>
+ if (duplicateNameCols.contains(col)) {
+ Alias(col, col.qualifiedName)(exprId = col.exprId)
+ } else {
+ col
+ }
+ }
+ updatedOutList
+ }
+}
+
+object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ (subsumer, subsumee, compensation) match {
+ case (
+ gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
+ gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
+ None) =>
+ val isGroupingEmR = gb_2q.predicateList.forall(expr =>
+ gb_2a.predicateList.exists(_.semanticEquals(expr)))
+ val isGroupingRmE = gb_2a.predicateList.forall(expr =>
+ gb_2q.predicateList.exists(_.semanticEquals(expr)))
+ if (isGroupingEmR && isGroupingRmE) {
+ val isOutputEmR = gb_2q.outputList.forall {
+ case a @ Alias(_, _) =>
+ gb_2a.outputList.exists{a1 =>
+ a1.isInstanceOf[Alias] && a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+ }
+ case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+ }
+
+ if (isOutputEmR) {
+ // Mappings of output of two plans by checking semantic equals.
+ val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) =>
+ (exp, gb_2q.outputList.find {
+ case a: Alias if exp.isInstanceOf[Alias] =>
+ a.child.semanticEquals(exp.children.head)
+ case a: Alias => a.child.semanticEquals(exp)
+ case other => other.semanticEquals(exp)
+ }.getOrElse(gb_2a.outputList(index)))
+ }
+
+ val oList = mappings.map{case (out1, out2) =>
+ if (out1.name != out2.name) out1 match {
+ case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId)
+ case _ => Alias(out1, out2.name)(exprId = out2.exprId)
+ } else out1
+ }
+
+ Seq(gb_2a.copy(outputList = oList))
+ } else {
+ Nil
+ }
+ } else {
+ val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
+ (a.toAttribute, a)})
+ if (isGroupingEmR) {
+ Utils.tryMatch(
+ gb_2a, gb_2q, aliasMap).flatMap {
+ case g: GroupBy =>
+ Some(g.copy(child = g.child.withNewChildren(
+ g.child.children.map {
+ case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
+ case other => other
+ })));
+ case _ => None}.map(Seq(_)).getOrElse(Nil)
+ } else {
+ Nil
+ }
+ }
+
+ case _ => Nil
+ }
+ }
+}
+
+object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]) = {
+ if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
+ if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR)) true
+ else false
+ } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None"))
+ .asInstanceOf[Select].predicateList.contains(exprE)) {
+ if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE))) true
+ else false
+ } else {
+ false
+ }
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val aggInputEinR = subsumee.expressions
+ .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
+ .subsetOf(subsumer.outputSet)
+ }.forall(identity)
+ val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
+ .exists(_.contains(modular.GroupBy))
+
+ (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
+ case (
+ gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
+ gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
+ Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _, _)),
+ true,
+ true)
+ if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
+
+ val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
+ val isGroupingEdR = gb_2q.predicateList.forall(expr =>
+ isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+ val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains)
+ val canPullup = sel_1c1.predicateList.forall(expr =>
+ isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+ val isAggEmR = gb_2q.outputList.collect {
+ case agg: aggregate.AggregateExpression =>
+ gb_2a.outputList.exists(_.semanticEquals(agg))
+ }.forall(identity)
+
+ if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) {
+ // pull up
+ val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
+ val sel_2c1 = sel_1c1.copy(
+ outputList = pullupOutputList,
+ inputList = pullupOutputList,
+ children = sel_1c1.children.map {
+ case s: Select => gb_2a
+ case other => other })
+
+ if (rejoinOutputList.isEmpty) {
+ val aliasMap = AttributeMap(gb_2a.outputList.collect {
+ case a: Alias => (a.toAttribute, a) })
+ Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
+ case g: GroupBy => Some(g.copy(child = sel_2c1));
+ case _ => None
+ }.map { wip =>
+ factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)
+ }.map(Seq(_))
+ .getOrElse(Nil)
+ }
+ // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
+ // via catalog service
+ else if (!needRegrouping && isAggEmR) {
+ Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap))
+ } else Nil
+ } else Nil
+
+ case _ => Nil
+ }
+ }
+}
+
+object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet
+
+ (subsumer, subsumee, groupbys.nonEmpty) match {
+ case (
+ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ true) =>
+ // TODO: implement me
+ Nil
+
+ case _ => Nil
+ }
+ }
+}
+
+
+object SelectSelectSelectChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val compensationSelectOnly =
+ !compensation
+ .map { _.collect { case n => n.getClass } }
+ .exists(_.contains(modular.GroupBy))
+
+ (subsumer, subsumee, compensationSelectOnly) match {
+ case (
+ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ modular.Select(_, _, _, _, _, _, _, _, _, _),
+ true) =>
+ // TODO: implement me
+ Nil
+ case _ => Nil
+ }
+ }
+}
+
+object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]) = {
+ Utils.isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan])
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
+ case (
+ sel_3a@modular.Select(
+ _, _, Nil, _, _,
+ Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
+ sel_3q@modular.Select(
+ _, _, _, _, _,
+ Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
+ Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
+ rchild :: Nil,
+ echild :: Nil) =>
+ val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
+ val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+
+ val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
+ val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
+ val rejoinOutputList = rejoin.flatMap(_.output)
+
+ val isPredicateRmE = sel_3a.predicateList.forall(expr =>
+ sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
+ gb_2c.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_3q.predicateList
+ .forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+ val isOutputEdR = sel_3q.outputList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+
+ val canSELPullup = gb_2c.child.isInstanceOf[Select] &&
+ gb_2c.child.asInstanceOf[Select].predicateList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+ val canGBPullup = gb_2c.predicateList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+
+ if (extrajoin.isEmpty && isPredicateRmE &&
+ isPredicateEmdR &&
+ isOutputEdR &&
+ canSELPullup &&
+ canGBPullup) {
+ gb_2c.child match {
+ case s: Select =>
+ val sel_3c1 = s.withNewChildren(
+ s.children.map {
+ case gb: GroupBy => sel_3a.setSkip()
+ case other => other })
+ val gb_3c2 = gb_2c.copy(child = sel_3c1)
+
+ val aliasMap_exp = AttributeMap(
+ gb_2c.outputList.collect {
+ case a: Alias => (a.toAttribute, a) })
+ val sel_3q_exp = sel_3q.transformExpressions({
+ case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
+ })
+ // Mappings of output of two plans by checking semantic equals.
+ val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) =>
+ (exp, gb_2c.outputList.find {
+ case a: Alias if exp.isInstanceOf[Alias] =>
+ a.child.semanticEquals(exp.children.head)
+ case a: Alias => a.child.semanticEquals(exp)
+ case other => other.semanticEquals(exp)
+ }.getOrElse(gb_2c.outputList(index)))
+ }
+
+ val oList = for ((o1, o2) <- mappings) yield {
+ if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+ }
+
+ val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
+ val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap))
+ sel_3c3.map(Seq(_)).getOrElse(Nil)
+
+ case _ => Nil
+ }
+ } else {
+ Nil
+ }
+
+ case _ => Nil
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
new file mode 100644
index 0000000..2a4da27
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+// TODO: implement this to modularize DefaultMatchingFunctions
+object MatchConditions {
+}
+
+class MatchConditions(flags: Long) {
+ def hasFlag(flag: Long): Boolean = {
+ throw new UnsupportedOperationException
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
new file mode 100644
index 0000000..2c5d8f4
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
+
+ def apply(
+ subsumer: MatchingPlan,
+ subsumee: MatchingPlan,
+ compensation: Option[MatchingPlan],
+ rewrite: QueryRewrite): Seq[MatchingPlan]
+
+}
+
+abstract class MatchMaker[MatchingPlan <: TreeNode[MatchingPlan]] {
+
+ /** Define a sequence of rules, to be overridden by the implementation. */
+ protected val patterns: Seq[MatchPattern[MatchingPlan]]
+
+ def execute(
+ subsumer: MatchingPlan,
+ subsumee: MatchingPlan,
+ compensation: Option[MatchingPlan],
+ rewrite: QueryRewrite): Iterator[MatchingPlan] = {
+ val iter = patterns.view.flatMap(_ (subsumer, subsumee, compensation, rewrite)).toIterator
+ iter
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
new file mode 100644
index 0000000..545920e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet}
+
+import org.apache.carbondata.mv.datamap.{MVHelper, MVState}
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular
+
+private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVState) {
+
+ def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+ val replaced = plan.transformAllExpressions {
+ case s: ModularSubquery =>
+ if (s.children.isEmpty) {
+ ScalarModularSubquery(
+ rewriteWithSummaryDatasetsCore(s.plan, rewrite), s.children, s.exprId)
+ }
+ else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported")
+ case o => o
+ }
+ rewriteWithSummaryDatasetsCore(replaced, rewrite)
+ }
+
+ def rewriteWithSummaryDatasetsCore(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+ val rewrittenPlan = plan transformDown {
+ case currentFragment =>
+ if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment
+ else {
+ val compensation =
+ (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
+ subsumer <- session.modularizer.modularize(
+ session.optimizer.execute(dataset.plan)).map(_.harmonized)
+ subsumee <- unifySubsumee(currentFragment)
+ comp <- subsume(
+ unifySubsumer2(
+ unifySubsumer1(
+ subsumer,
+ subsumee,
+ dataset.relation),
+ subsumee),
+ subsumee, rewrite)
+ } yield comp).headOption
+ compensation.map(_.setRewritten).getOrElse(currentFragment)
+ }
+ }
+ // In case it is rewritten plan and the datamap table is not updated then update the datamap
+ // table in plan.
+ if (rewrittenPlan.find(_.rewritten).isDefined) {
+ val updatedDataMapTablePlan = rewrittenPlan transform {
+ case s: Select =>
+ MVHelper.updateDataMap(s, rewrite)
+ case g: GroupBy =>
+ MVHelper.updateDataMap(g, rewrite)
+ }
+ // TODO Find a better way to set the rewritten flag, it may fail in some conditions.
+ val mapping =
+ rewrittenPlan.collect {case m: ModularPlan => m } zip
+ updatedDataMapTablePlan.collect {case m: ModularPlan => m}
+ mapping.foreach(f => if (f._1.rewritten) f._2.setRewritten())
+
+ updatedDataMapTablePlan
+
+ } else {
+ rewrittenPlan
+ }
+ }
+
+ def subsume(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ rewrite: QueryRewrite): Option[ModularPlan] = {
+ if (subsumer.getClass == subsumee.getClass) {
+ (subsumer.children, subsumee.children) match {
+ case (Nil, Nil) => None
+ case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) &&
+ e.forall(_.isInstanceOf[modular.LeafNode]) =>
+ val iter = session.matcher.execute(subsumer, subsumee, None, rewrite)
+ if (iter.hasNext) Some(iter.next)
+ else None
+
+ case (rchild :: Nil, echild :: Nil) =>
+ val compensation = subsume(rchild, echild, rewrite)
+ val oiter = compensation.map {
+ case comp if comp.eq(rchild) =>
+ session.matcher.execute(subsumer, subsumee, None, rewrite)
+ case _ =>
+ session.matcher.execute(subsumer, subsumee, compensation, rewrite)
+ }
+ oiter.flatMap { case iter if iter.hasNext => Some(iter.next)
+ case _ => None }
+
+ case _ => None
+ }
+ } else None
+ }
+
+ private def updateDatamap(rchild: ModularPlan, subsume: ModularPlan) = {
+ val update = rchild match {
+ case s: Select if s.dataMapTableRelation.isDefined =>
+ true
+ case g: GroupBy if g.dataMapTableRelation.isDefined =>
+ true
+ case _ => false
+ }
+
+ if (update) {
+ subsume match {
+ case s: Select =>
+ s.copy(children = Seq(rchild))
+
+ case g: GroupBy =>
+ g.copy(child = rchild)
+ case _ => subsume
+ }
+ } else {
+ subsume
+ }
+ }
+
+ // add Select operator as placeholder on top of subsumee to facilitate matching
+ def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
+ subsumee match {
+ case gb @ modular.GroupBy(_, _, _, _,
+ modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
+ Some(
+ Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags,
+ gb.flagSpec, Seq.empty))
+ case other => Some(other)
+ }
+ }
+
+ // add Select operator as placeholder on top of subsumer to facilitate matching
+ def unifySubsumer1(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ dataMapRelation: ModularPlan): ModularPlan = {
+ // Update datamap table relation to the subsumer modular plan
+ val updatedSubsumer = subsumer match {
+ case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation))
+ case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation))
+ case other => other
+ }
+ (updatedSubsumer, subsumee) match {
+ case (r @
+ modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _),
+ modular.Select(_, _, _, _, _,
+ Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)),
+ _, _, _, _)
+ ) =>
+ modular.Select(
+ r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags,
+ r.flagSpec, Seq.empty).setSkip()
+ case _ => updatedSubsumer.setSkip()
+ }
+ }
+
+ def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+ val rtables = subsumer.collect { case n: modular.LeafNode => n }
+ val etables = subsumee.collect { case n: modular.LeafNode => n }
+ val pairs = for {
+ rtable <- rtables
+ etable <- etables
+ if (rtable == etable)
+ } yield (rtable, etable)
+
+ pairs.foldLeft(subsumer) {
+ case (curSubsumer, pair) =>
+ val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 }
+ val attributeSet = AttributeSet(pair._1.output)
+ val rewrites = AttributeMap(pair._1.output.zip(pair._2.output))
+ nxtSubsumer.transformUp {
+ case p => p.transformExpressions {
+ case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
new file mode 100644
index 0000000..5039d66
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.datamap.MVState
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * The primary workflow for rewriting relational queries using Spark libraries.
+ */
+class QueryRewrite private (
+ state: MVState,
+ logical: LogicalPlan,
+ nextSubqueryId: AtomicLong) {
+ self =>
+
+ def this(state: MVState, logical: LogicalPlan) =
+ this(state, logical, new AtomicLong(0))
+
+ def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}"
+
+ lazy val optimizedPlan: LogicalPlan =
+ state.optimizer.execute(logical)
+
+ lazy val modularPlan: ModularPlan =
+ state.modularizer.modularize(optimizedPlan).next().harmonized
+
+ lazy val withSummaryData: ModularPlan =
+ state.navigator.rewriteWithSummaryDatasets(modularPlan, self)
+
+ lazy val toCompactSQL: String = withSummaryData.asCompactSQL
+
+ lazy val toOneLineSQL: String = withSummaryData.asOneLineSQL
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
new file mode 100644
index 0000000..c29c08f
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.FindDataSourceTable
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.datamap.DataMapCatalog
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.mv.datamap.{MVHelper, MVState}
+import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelation, Select}
+import org.apache.carbondata.mv.plans.util.Signature
+
+/** Holds a summary logical plan */
+private[mv] case class SummaryDataset(signature: Option[Signature],
+ plan: LogicalPlan,
+ dataMapSchema: DataMapSchema,
+ relation: ModularPlan)
+
+private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
+ extends DataMapCatalog[SummaryDataset] {
+
+ @transient
+ private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset]
+
+ val mVState = new MVState(this)
+
+ @transient
+ private val registerLock = new ReentrantReadWriteLock
+
+ /**
+ * parser
+ */
+ lazy val parser = new CarbonSpark2SqlParser
+
+ /** Acquires a read lock on the catalog for the duration of `f`. */
+ private def readLock[A](f: => A): A = {
+ val lock = registerLock.readLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Acquires a write lock on the catalog for the duration of `f`. */
+ private def writeLock[A](f: => A): A = {
+ val lock = registerLock.writeLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Clears all summary tables. */
+ private[mv] def refresh(): Unit = {
+ writeLock {
+ summaryDatasets.clear()
+ }
+ }
+
+ /** Checks if the catalog is empty. */
+ private[mv] def isEmpty: Boolean = {
+ readLock {
+ summaryDatasets.isEmpty
+ }
+ }
+
+ /**
+ * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike
+ * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+ * the in-memory columnar representation of the underlying table is expensive.
+ */
+ private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = {
+ writeLock {
+ // TODO Add mvfunction here, don't use preagg function
+ val updatedQuery = parser.addPreAggFunction(dataMapSchema.getCtasQuery)
+ val query = sparkSession.sql(updatedQuery)
+ val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed)
+ val modularPlan = mVState.modularizer.modularize(mVState.optimizer.execute(planToRegister))
+ .next()
+ .harmonized
+ val signature = modularPlan.signature
+ val identifier = dataMapSchema.getRelationIdentifier
+ val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
+ .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName))))
+ .output
+ val relation = ModularRelation(identifier.getDatabaseName,
+ identifier.getTableName,
+ output,
+ Flags.NoFlags,
+ Seq.empty)
+ val select = Select(relation.outputList,
+ relation.outputList,
+ Seq.empty,
+ Seq((0, identifier.getTableName)).toMap,
+ Seq.empty,
+ Seq(relation),
+ Flags.NoFlags,
+ Seq.empty,
+ Seq.empty,
+ None)
+
+ summaryDatasets += SummaryDataset(signature, planToRegister, dataMapSchema, select)
+ }
+ }
+
+ /** Removes the given [[DataFrame]] from the catalog */
+ private[mv] def unregisterSchema(dataMapName: String): Unit = {
+ writeLock {
+ val dataIndex = summaryDatasets
+ .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(dataMapName))
+ require(dataIndex >= 0, s"Datamap $dataMapName is not registered.")
+ summaryDatasets.remove(dataIndex)
+ }
+ }
+
+
+ override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray
+
+ /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+ private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = {
+ readLock {
+ val sig = plan.signature
+ val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails
+ // Only select the enabled datamaps for the query.
+ val enabledDataSets = summaryDatasets.filter{p =>
+ statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName))
+ }
+ val feasible = enabledDataSets.filter { x =>
+ (x.signature, sig) match {
+ case (Some(sig1), Some(sig2)) =>
+ if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+ true
+ } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+ true
+ } else {
+ false
+ }
+
+ case _ => false
+ }
+ }
+ // heuristics: more tables involved in a summary data set => higher query reduction factor
+ feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size)
+ }
+ }
+}