You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/09 08:44:29 UTC
[11/12] carbondata git commit: [CARBONDATA-2242] Add Materialized
View modules
[CARBONDATA-2242] Add Materialized View modules
Initial code for Materialized View DataMap.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/02fd7873
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/02fd7873
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/02fd7873
Branch: refs/heads/master
Commit: 02fd78734cce070cdf9e3dec1a4adac15b789dd3
Parents: be600bc
Author: Jacky Li <ja...@qq.com>
Authored: Fri Mar 9 13:59:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 9 16:42:18 2018 +0800
----------------------------------------------------------------------
datamap/mv/core/pom.xml | 99 +
.../org/apache/carbondata/mv/MQOSession.scala | 86 +
.../carbondata/mv/internal/SessionState.scala | 56 +
.../carbondata/mv/internal/SharedState.scala | 65 +
.../mv/rewrite/DefaultMatchMaker.scala | 597 +++
.../carbondata/mv/rewrite/MatchConditions.scala | 28 +
.../carbondata/mv/rewrite/MatchMaker.scala | 47 +
.../carbondata/mv/rewrite/Navigator.scala | 137 +
.../carbondata/mv/rewrite/QueryRewrite.scala | 53 +
.../mv/rewrite/SummaryDatasetCatalog.scala | 176 +
.../apache/carbondata/mv/rewrite/Utils.scala | 358 ++
.../SelectSelectExactChildrenSuite.scala | 76 +
.../carbondata/mv/rewrite/TestSQLSuite.scala | 99 +
.../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 90 +
.../mv/rewrite/matching/TestSQLBatch.scala | 214 +
.../rewrite/matching/TestTPCDS_1_4_Batch.scala | 2496 ++++++++++
datamap/mv/plan/pom.xml | 158 +
.../org/apache/carbondata/mv/dsl/package.scala | 101 +
.../mv/expressions/modular/subquery.scala | 169 +
.../mv/plans/modular/AggregatePushDown.scala | 156 +
.../carbondata/mv/plans/modular/Flags.scala | 71 +
.../mv/plans/modular/Harmonizer.scala | 217 +
.../mv/plans/modular/ModularPatterns.scala | 237 +
.../mv/plans/modular/ModularPlan.scala | 205 +
.../modular/ModularPlanSignatureGenerator.scala | 73 +
.../mv/plans/modular/ModularRelation.scala | 143 +
.../mv/plans/modular/Modularizer.scala | 117 +
.../mv/plans/modular/basicOperators.scala | 84 +
.../mv/plans/modular/queryGraph.scala | 24 +
.../apache/carbondata/mv/plans/package.scala | 55 +
.../mv/plans/util/BirdcageOptimizer.scala | 199 +
.../plans/util/Logical2ModularExtractions.scala | 353 ++
.../util/LogicalPlanSignatureGenerator.scala | 101 +
.../carbondata/mv/plans/util/Printers.scala | 347 ++
.../carbondata/mv/plans/util/SQLBuild.scala | 31 +
.../carbondata/mv/plans/util/SQLBuildDSL.scala | 425 ++
.../carbondata/mv/plans/util/SQLBuilder.scala | 260 ++
.../carbondata/mv/plans/util/Signature.scala | 49 +
.../carbondata/mv/plans/util/TableCluster.scala | 55 +
.../mv/testutil/ModularPlanTest.scala | 178 +
.../mv/testutil/Tpcds_1_4_QueryBatch.scala | 4293 ++++++++++++++++++
.../mv/testutil/Tpcds_1_4_Tables.scala | 845 ++++
.../org/apache/carbondata/mv/TestSQLBatch.scala | 468 ++
.../apache/carbondata/mv/TestSQLBatch2.scala | 138 +
.../mv/plans/ExtractJoinConditionsSuite.scala | 67 +
.../carbondata/mv/plans/IsSPJGHSuite.scala | 59 +
.../mv/plans/LogicalToModularPlanSuite.scala | 196 +
.../carbondata/mv/plans/ModularToSQLSuite.scala | 148 +
.../carbondata/mv/plans/SignatureSuite.scala | 97 +
.../mv/plans/Tpcds_1_4_BenchmarkSuite.scala | 88 +
.../scala/org/apache/spark/sql/QueryTest.scala | 352 ++
pom.xml | 22 +-
tools/advisor/README | 64 +
.../input/queries-2017-10-25.json.log.gz | Bin 0 -> 642 bytes
.../input/queries-2017-11-02.json.log.gz | Bin 0 -> 1084 bytes
tools/advisor/output/mv-candidate.sql | 13 +
tools/advisor/pom.xml | 151 +
.../apache/carbondata/mv/tool/MVToolBase.scala | 168 +
.../manager/CommonSubexpressionManager.scala | 498 ++
.../manager/CommonSubexpressionRuleEngine.scala | 44 +
.../manager/CostBasedMVRecommendation.scala | 212 +
.../preprocessor/QueryBatchPreprocessor.scala | 75 +
.../preprocessor/QueryBatchRuleEngine.scala | 35 +
...1-5c81abd56c05e598_1733531031_data.0.parq.gz | Bin 0 -> 462151 bytes
...6-1724efa8090227b0_1233867673_data.0.parq.gz | Bin 0 -> 1797 bytes
...e-990a02877791fb85_1630871489_data.0.parq.gz | Bin 0 -> 7730 bytes
...2-d483027fae4f4b9f_1568698772_data.0.parq.gz | Bin 0 -> 2858 bytes
.../resources/data/files/DIM_APN_IOT.dat.gz | Bin 0 -> 14006 bytes
...b-78c28edfa577c286_1251401173_data.0.parq.gz | Bin 0 -> 16659914 bytes
...sdr_dyn_seq_custer_iot_all_hour_60min.dat.gz | Bin 0 -> 6246209 bytes
.../mv/tool/CostBasedCSEManagerSuite.scala | 241 +
.../apache/carbondata/mv/tool/MVToolSuite.scala | 68 +
.../carbondata/mv/tool/QBProcessorSuite.scala | 98 +
.../mv/tool/constructing/TestSEQ_MVBatch.scala | 244 +
.../constructing/TestTPCDS_1_4_MVBatch.scala | 1233 +++++
.../spark/sql/hive/tpcds/TestHelper.scala | 37 +
.../apache/spark/sql/hive/tpcds/TestHive.scala | 958 ++++
.../sql/hive/tpcds/TestHiveSingleton.scala | 36 +
78 files changed, 19428 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
new file mode 100644
index 0000000..7291567
--- /dev/null
+++ b/datamap/mv/core/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-mv-core</artifactId>
+ <name>Apache CarbonData :: Materialized View Core</name>
+
+ <properties>
+ <dev.path>${basedir}/../../../dev</dev.path>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-mv-plan</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
new file mode 100644
index 0000000..f4e89f2
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import java.io.Closeable
+import java.math.BigInteger
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.carbondata.mv.internal.{SessionState, SharedState}
+import org.apache.carbondata.mv.rewrite.QueryRewrite
+
+/**
+ * The entry point for working with multi-query optimization in Carbon. Allow the
+ * creation of CSEs (covering subexpression) as well as query rewrite
+ */
+class MQOSession private(
+ @transient val spark: SparkSession,
+ @transient private val existingSharedState: Option[SharedState])
+ extends Serializable with Closeable {
+
+ self =>
+
+ def this(spark: SparkSession) = {
+ this(spark, None)
+ }
+
+ /* ----------------------- *
+ | Session-related state |
+ * ----------------------- */
+
+ /**
+ * State shared across sessions, including the `SparkContext`, cached data, listener,
+ * and a catalog that interacts with external systems.
+ */
+ private[mv] lazy val sharedState: SharedState = {
+ existingSharedState.getOrElse(new SharedState(spark.sparkContext))
+ }
+
+ /**
+ * State isolated across sessions, including SQL configurations, temporary tables, registered
+ * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
+ */
+ @transient
+ private[mv] lazy val sessionState: SessionState = new SessionState(self)
+
+ @transient
+ lazy val tableFrequencyMap = new mutable.HashMap[String, Int]
+
+ @transient
+ lazy val consumersMap = new mutable.HashMap[BigInteger, mutable.Set[LogicalPlan]] with mutable
+ .MultiMap[BigInteger, LogicalPlan]
+
+ def rewrite(sqlText: String): QueryRewrite = {
+ val plan1 = spark.sql(sqlText).queryExecution.analyzed
+ sessionState.rewritePlan(plan1)
+ }
+
+ def rewrite(queryExecution: QueryExecution): QueryRewrite = {
+ val plan1 = queryExecution.analyzed
+ sessionState.rewritePlan(plan1)
+ }
+
+ override def close(): Unit = spark.close()
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
new file mode 100644
index 0000000..07291d0
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.internal
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.plans.modular.SimpleModularizer
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, Navigator, QueryRewrite}
+
+/**
+ * A class that holds all session-specific state in a given [[MQOSession]].
+ */
+private[mv] class SessionState(mqoSession: MQOSession) {
+
+ // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+ // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ lazy val catalog = mqoSession.sharedState.summaryDatasetCatalog
+
+ /**
+ * Modular query plan modularizer
+ */
+ lazy val modularizer = SimpleModularizer
+
+ /**
+ * Logical query plan optimizer.
+ */
+ lazy val optimizer = BirdcageOptimizer
+
+ lazy val matcher = DefaultMatchMaker
+
+ lazy val navigator: Navigator = new Navigator(catalog, mqoSession)
+
+ def rewritePlan(plan: LogicalPlan): QueryRewrite = new QueryRewrite(mqoSession, plan)
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
new file mode 100644
index 0000000..afc4565
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.internal
+
+import java.io.File
+
+import scala.io.Source
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.CacheManager
+
+import org.apache.carbondata.mv.rewrite.SummaryDatasetCatalog
+
+/**
+ * A class that holds all state shared across sessions in a given [[SQLContext]].
+ */
+private[mv] class SharedState(val sparkContext: SparkContext) {
+
+ @transient
+ lazy val summaryDatasetCatalog = new SummaryDatasetCatalog(sparkContext)
+
+ def clearSummaryDatasetCatalog(): Unit = summaryDatasetCatalog.clearSummaryDatasetCatalog()
+
+ def registerSummaryDataset(query: DataFrame): Unit = {
+ summaryDatasetCatalog.registerSummaryDataset(query)
+ }
+
+ def unregisterSummaryDataset(query: DataFrame): Unit = {
+ summaryDatasetCatalog.unregisterSummaryDataset(query)
+ }
+
+ def refreshSummaryDatasetCatalog(filePath: String): Unit = {
+ val file = new File(filePath)
+ val sqlContext = new SQLContext(sparkContext)
+ if (file.exists) {
+ clearSummaryDatasetCatalog()
+ for (line <- Source.fromFile(file).getLines) {
+ registerSummaryDataset(sqlContext.sql(line))
+ }
+ }
+ }
+
+ /**
+ * Class for caching query results reused in future executions.
+ */
+ val cacheManager: CacheManager = new CacheManager
+}
+
+object SharedState {}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
new file mode 100644
index 0000000..f549e6e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{GroupBy, JoinEdge, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuilder
+
+abstract class DefaultMatchMaker extends MatchMaker[ModularPlan]
+
+abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
+
+ /** Name for this pattern, automatically inferred based on class name. */
+ val patternName: String = {
+ val className = getClass.getName
+ if (className endsWith "$") className.dropRight(1) else className
+ }
+
+ def factorOutSubsumer(
+ compensation: ModularPlan,
+ subsumer: Matchable,
+ subsumerName: Option[String]): ModularPlan = {
+ val aliasMap = AttributeMap(
+ subsumer.outputList.collect {
+ case a: Alias if a.child.isInstanceOf[Attribute] =>
+ (a.child.asInstanceOf[Attribute], a.toAttribute)
+ })
+
+ val compensation1 = compensation.transform {
+ case plan if !plan.skip =>
+ plan.transformExpressions {
+ case a: AttributeReference =>
+ aliasMap
+ .get(a)
+ .map { ref =>
+ AttributeReference(
+ ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = a.qualifier)
+ }.getOrElse(a)
+ }
+ }
+
+ val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
+ if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+ new UnsupportedOperationException(
+ s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
+ }
+ val compensationFinal = compensation1.transformExpressions {
+ case ref: Attribute if subqueryAttributeSet.contains(ref) =>
+ AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+ case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
+ Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+ }
+ compensationFinal
+ }
+}
+
+object DefaultMatchMaker extends DefaultMatchMaker {
+ lazy val patterns =
+ SelectSelectNoChildDelta ::
+ GroupbyGroupbyNoChildDelta ::
+ GroupbyGroupbySelectOnlyChildDelta ::
+ GroupbyGroupbyGroupbyChildDelta ::
+ SelectSelectSelectChildDelta ::
+ SelectSelectGroupbyChildDelta :: Nil
+}
+
+/**
+ * Convention:
+ * EmR: each subsumee's expression match some of subsumer's expression
+ * EdR: each subsumee's expression derive from some of subsumer's expression
+ * RmE: each subsumer's expression match some of subsumee's expression
+ * RdE: each subsumer's expression derive from some of subsumee's expression
+ */
+
+object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]): Boolean = {
+ if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) {
+ subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) ||
+ canEvaluate(exprE, subsumer)
+ } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) {
+ exprE match {
+ case a@Alias(_, _) =>
+ exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
+ a1.asInstanceOf[Alias].child.semanticEquals(a.child))
+ case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
+ }
+ } else {
+ false
+ }
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+
+ (subsumer, subsumee, compensation) match {
+ case (
+ sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _),
+ sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _), None
+ ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
+ sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+
+ // assume children (including harmonized relation) of subsumer and subsumee
+ // are 1-1 correspondence.
+ // Change the following two conditions to more complicated ones if we want to
+ // consider things that combine extrajoin, rejoin, and harmonized relations
+ val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 }
+ val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 }
+
+ val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
+ val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
+ val rejoinOutputList = rejoin.flatMap(_.output)
+
+ val isPredicateRmE = sel_1a.predicateList.forall(expr =>
+ sel_1q.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
+ isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+ val isOutputEdR = sel_1q.outputList.forall(expr =>
+ isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+
+ if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
+ isPredicateEmdR && isOutputEdR) {
+ val mappings = sel_1a.children.zipWithIndex.map {
+ case (childr, fromIdx) if sel_1q.children.contains(childr) =>
+ val toIndx = sel_1q.children.indexWhere(_ == childr)
+ (toIndx -> fromIdx)
+
+ }
+ val e2r = mappings.toMap
+ val r2e = e2r.map(_.swap)
+ val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
+ (r2e.get(x.left), r2e.get(x.right)) match {
+ case (Some(l), Some(r)) =>
+ val mappedEdge = JoinEdge(l, r, x.joinType)
+ val joinTypeEquivalent =
+ if (sel_1q.joinEdges.contains(mappedEdge)) true
+ else {
+ x.joinType match {
+ case Inner | FullOuter =>
+ sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
+ case _ => false
+ }
+ }
+ if (joinTypeEquivalent) {
+ val sel_1a_join = sel_1a.extractJoinConditions(
+ sel_1a.children(x.left),
+ sel_1a.children(x.right))
+ val sel_1q_join = sel_1q.extractJoinConditions(
+ sel_1q.children(mappedEdge.left),
+ sel_1q.children(mappedEdge.right))
+ sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) &&
+ sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_)))
+ } else false
+ case _ => false
+ }
+ }
+
+ val isPredicateEmR = sel_1q.predicateList.forall(expr =>
+ sel_1a.predicateList.exists(_.semanticEquals(expr)))
+ val isOutputEmR = sel_1q.outputList.forall(expr =>
+ sel_1a.outputList.exists(_.semanticEquals(expr)))
+ val isOutputRmE = sel_1a.outputList.forall(expr =>
+ sel_1q.outputList.exists(_.semanticEquals(expr)))
+
+ if (r2eJoinsMatch) {
+ if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty) {
+ Seq(sel_1a) // no compensation needed
+ } else {
+ val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+ val tAliasMap = new collection.mutable.HashMap[Int, String]()
+
+ tChildren += sel_1a
+ tAliasMap += (tChildren.indexOf(sel_1a) -> rewrite.newSubsumerName())
+
+ sel_1q.children.zipWithIndex.foreach {
+ case (childe, idx) =>
+ if (e2r.get(idx).isEmpty) {
+ tChildren += childe
+ sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x))
+ }
+ }
+
+ val tJoinEdges = sel_1q.joinEdges.collect {
+ case JoinEdge(le, re, joinType) =>
+ (e2r.get(le), e2r.get(re)) match {
+ case (Some(lr), None) =>
+ JoinEdge(
+ 0,
+ tChildren.indexOf(sel_1q.children(re)),
+ joinType)
+ case (None, None) =>
+ JoinEdge(
+ tChildren.indexOf(sel_1q.children(le)),
+ tChildren.indexOf(sel_1q.children(re)),
+ joinType)
+ case (None, Some(rr)) =>
+ JoinEdge(
+ tChildren.indexOf(sel_1q.children(le)),
+ 0,
+ joinType)
+ case _ =>
+ null.asInstanceOf[JoinEdge]
+ }
+ }
+ val tPredicateList = sel_1q.predicateList.filter { p =>
+ !sel_1a.predicateList.exists(_.semanticEquals(p)) }
+ val wip = sel_1q.copy(
+ predicateList = tPredicateList,
+ children = tChildren,
+ joinEdges = tJoinEdges.filter(_ != null),
+ aliasMap = tAliasMap.toMap)
+ val subsumerName = wip.aliasMap.get(0)
+ val done = factorOutSubsumer(wip, sel_1a, subsumerName)
+ Seq(done)
+ }
+ } else Nil
+ } else Nil
+
+ case (
+ sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _),
+ sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _), None)
+ if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
+ sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+ val isPredicateRmE = sel_3a.predicateList.isEmpty ||
+ sel_3a.predicateList.forall(expr =>
+ sel_3q.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
+ sel_3q.predicateList.forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+ val isOutputEdR = sel_3q.outputList.forall(expr =>
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+ val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
+
+ if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
+ val isPredicateEmR = sel_3q.predicateList.isEmpty ||
+ sel_3q.predicateList.forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)))
+ val isOutputRmE = sel_3a.outputList.forall(expr =>
+ isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
+ val isOutputEmR = sel_3q.outputList.forall(expr =>
+ isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+
+ if (isPredicateEmR && isOutputEmR && isOutputRmE) {
+ Seq(sel_3a)
+ } else if (isPredicateEmR && isOutputEmR) {
+ // no compensation needed
+ val sel_3q_exp = sel_3q.transformExpressions({
+ case a: Alias => sel_3a.outputList
+ .find { a1 =>
+ a1.isInstanceOf[Alias] &&
+ a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+ }.map(_.toAttribute).get
+ })
+ val wip = sel_3q_exp.copy(
+ children = Seq(sel_3a),
+ aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+ val subsumerName = wip.aliasMap.get(0)
+ val done = factorOutSubsumer(wip, sel_3a, subsumerName)
+ Seq(done)
+ } else {
+ Nil
+ }
+ } else Nil
+
+ case _ => Nil
+ }
+ }
+}
+
+object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ (subsumer, subsumee, compensation) match {
+ case (
+ gb_2a @ modular.GroupBy(_, _, _, _, _, _, _),
+ gb_2q @ modular.GroupBy(_, _, _, _, _, _, _),
+ None) =>
+ val isGroupingEmR = gb_2q.predicateList.forall(expr =>
+ gb_2a.predicateList.exists(_.semanticEquals(expr)))
+ val isGroupingRmE = gb_2a.predicateList.forall(expr =>
+ gb_2q.predicateList.exists(_.semanticEquals(expr)))
+ if (isGroupingEmR && isGroupingRmE) {
+ val isOutputEmR = gb_2q.outputList.forall {
+ case a @ Alias(_, _) => gb_2a.outputList.exists(a1 => a1.isInstanceOf[Alias] &&
+ a1.asInstanceOf[Alias].child
+ .semanticEquals(a.child))
+ case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+ }
+
+ if (isOutputEmR) {
+ Seq(gb_2a)
+ } else {
+ Nil
+ }
+ } else {
+ val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
+ (a.toAttribute, a)})
+ if (isGroupingEmR) {
+ Utils.tryMatch(
+ gb_2a, gb_2q, aliasMap).flatMap {
+ case g: GroupBy =>
+ Some(g.copy(child = g.child.withNewChildren(
+ g.child.children.map {
+ case modular.Select(_, _, _, _, _, _, _, _, _) => gb_2a;
+ case other => other
+ })));
+ case _ => None}.map(Seq(_)).getOrElse(Nil)
+ } else {
+ Nil
+ }
+ }
+
+ case _ => Nil
+ }
+ }
+}
+
+object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]) = {
+ if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
+ if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR)) true
+ else false
+ } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None"))
+ .asInstanceOf[Select].predicateList.contains(exprE)) {
+ if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE))) true
+ else false
+ } else {
+ false
+ }
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val aggInputEinR = subsumee.expressions
+ .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
+ .subsetOf(subsumer.outputSet)
+ }.forall(identity)
+ val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
+ .exists(_.contains(modular.GroupBy))
+
+ (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
+ case (
+ gb_2a @ modular.GroupBy(_, _, _, _, _, _, _),
+ gb_2q @ modular.GroupBy(_, _, _, _, _, _, _),
+ Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _)),
+ true,
+ true)
+ if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
+
+ val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
+ val isGroupingEdR = gb_2q.predicateList.forall(expr =>
+ isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+ val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains)
+ val canPullup = sel_1c1.predicateList.forall(expr =>
+ isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+ val isAggEmR = gb_2q.outputList.collect {
+ case agg: aggregate.AggregateExpression =>
+ gb_2a.outputList.exists(_.semanticEquals(agg))
+ }.forall(identity)
+
+ if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) {
+ // pull up
+ val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
+ val sel_2c1 = sel_1c1.copy(
+ outputList = pullupOutputList,
+ inputList = pullupOutputList,
+ children = sel_1c1.children.map {
+ case s: Select => gb_2a
+ case other => other })
+
+ if (needRegrouping && rejoinOutputList.isEmpty) {
+ val aliasMap = AttributeMap(gb_2a.outputList.collect {
+ case a: Alias => (a.toAttribute, a) })
+ Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
+ case g: GroupBy => Some(g.copy(child = sel_2c1));
+ case _ => None
+ }.map { wip =>
+ factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap.get(0))
+ }.map(Seq(_))
+ .getOrElse(Nil)
+ }
+ // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
+ // via catalog service
+ else if (!needRegrouping && isAggEmR) {
+ Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap.get(0)))
+ } else Nil
+ } else Nil
+
+ case _ => Nil
+ }
+ }
+}
+
+object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet
+
+ (subsumer, subsumee, groupbys.nonEmpty) match {
+ case (
+ modular.Select(_, _, _, _, _, _, _, _, _),
+ modular.Select(_, _, _, _, _, _, _, _, _),
+ true) =>
+ // TODO: implement me
+ Nil
+
+ case _ => Nil
+ }
+ }
+}
+
+
+object SelectSelectSelectChildDelta extends DefaultMatchPattern {
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ val compensationSelectOnly =
+ !compensation
+ .map { _.collect { case n => n.getClass } }
+ .exists(_.contains(modular.GroupBy))
+
+ (subsumer, subsumee, compensationSelectOnly) match {
+ case (
+ modular.Select(_, _, _, _, _, _, _, _, _),
+ modular.Select(_, _, _, _, _, _, _, _, _),
+ true) =>
+ // TODO: implement me
+ Nil
+ case _ => Nil
+ }
+ }
+}
+
+object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper {
+ private def isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]) = {
+ Utils.isDerivable(
+ exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan])
+ }
+
+ def apply(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ compensation: Option[ModularPlan],
+ rewrite: QueryRewrite): Seq[ModularPlan] = {
+ (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
+ case (
+ sel_3a@modular.Select(
+ _, _, Nil, _, _,
+ Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+ sel_3q@modular.Select(
+ _, _, _, _, _,
+ Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+ Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _)),
+ rchild :: Nil,
+ echild :: Nil) =>
+ val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
+ val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+
+ val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
+ val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
+ val rejoinOutputList = rejoin.flatMap(_.output)
+
+ val isPredicateRmE = sel_3a.predicateList.forall(expr =>
+ sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
+ gb_2c.predicateList.exists(_.semanticEquals(expr)))
+ val isPredicateEmdR = sel_3q.predicateList
+ .forall(expr =>
+ sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+ val isOutputEdR = sel_3q.outputList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+
+ val canSELPullup = gb_2c.child.isInstanceOf[Select] &&
+ gb_2c.child.asInstanceOf[Select].predicateList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+ val canGBPullup = gb_2c.predicateList
+ .forall(expr =>
+ isDerivable(
+ expr,
+ sel_3a.outputList ++ rejoinOutputList,
+ sel_3q,
+ sel_3a,
+ compensation))
+
+ if (extrajoin.isEmpty && isPredicateRmE &&
+ isPredicateEmdR &&
+ isOutputEdR &&
+ canSELPullup &&
+ canGBPullup) {
+ gb_2c.child match {
+ case s: Select =>
+ val sel_3c1 = s.withNewChildren(
+ s.children.map {
+ case gb: GroupBy => sel_3a.setSkip()
+ case other => other })
+ val gb_3c2 = gb_2c.copy(child = sel_3c1)
+
+ val aliasMap_exp = AttributeMap(
+ gb_2c.outputList.collect {
+ case a: Alias => (a.toAttribute, a) })
+ val sel_3q_exp = sel_3q.transformExpressions({
+ case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
+ })
+
+ val mappings = sel_3q_exp.outputList zip gb_2c.outputList
+
+ val oList = for ((o1, o2) <- mappings) yield {
+ if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+ }
+
+ val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
+ val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap.get(0)))
+ sel_3c3.map(Seq(_)).getOrElse(Nil)
+
+ case _ => Nil
+ }
+ } else {
+ Nil
+ }
+
+ case _ => Nil
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
new file mode 100644
index 0000000..2a4da27
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+// TODO: implement this to modularize DefaultMatchingFunctions
+object MatchConditions {
+}
+
+class MatchConditions(flags: Long) {
+ def hasFlag(flag: Long): Boolean = {
+ throw new UnsupportedOperationException
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
new file mode 100644
index 0000000..2c5d8f4
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
+
+ def apply(
+ subsumer: MatchingPlan,
+ subsumee: MatchingPlan,
+ compensation: Option[MatchingPlan],
+ rewrite: QueryRewrite): Seq[MatchingPlan]
+
+}
+
+abstract class MatchMaker[MatchingPlan <: TreeNode[MatchingPlan]] {
+
+ /** Define a sequence of rules, to be overridden by the implementation. */
+ protected val patterns: Seq[MatchPattern[MatchingPlan]]
+
+ def execute(
+ subsumer: MatchingPlan,
+ subsumee: MatchingPlan,
+ compensation: Option[MatchingPlan],
+ rewrite: QueryRewrite): Iterator[MatchingPlan] = {
+ val iter = patterns.view.flatMap(_ (subsumer, subsumee, compensation, rewrite)).toIterator
+ iter
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
new file mode 100644
index 0000000..2a244b9
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet}
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
+
+private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MQOSession) {
+
+ def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+ val replaced = plan.transformAllExpressions {
+ case s: ModularSubquery =>
+ if (s.children.isEmpty) {
+ ScalarModularSubquery(
+ rewriteWithSummaryDatasetsCore(s.plan, rewrite), s.children, s.exprId)
+ }
+ else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported")
+ case o => o
+ }
+ rewriteWithSummaryDatasetsCore(replaced, rewrite)
+ }
+
+ def rewriteWithSummaryDatasetsCore(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+ plan transformDown {
+ case currentFragment =>
+ if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment
+ else {
+ val compensation =
+ (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
+ subsumer <- session.sessionState.modularizer.modularize(
+ session.sessionState.optimizer.execute(dataset.plan)).map(_.harmonized)
+ subsumee <- unifySubsumee(currentFragment)
+ comp <- subsume(unifySubsumer2(unifySubsumer1(subsumer, subsumee), subsumee),
+ subsumee, rewrite)
+ } yield comp).headOption
+ compensation.map(_.setRewritten).getOrElse(currentFragment)
+ }
+ }
+ }
+
+ def subsume(
+ subsumer: ModularPlan,
+ subsumee: ModularPlan,
+ rewrite: QueryRewrite): Option[ModularPlan] = {
+ if (subsumer.getClass == subsumee.getClass) {
+ (subsumer.children, subsumee.children) match {
+ case (Nil, Nil) => None
+ case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) &&
+ e.forall(_.isInstanceOf[modular.LeafNode]) =>
+ val iter = session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
+ if (iter.hasNext) Some(iter.next)
+ else None
+
+ case (rchild :: Nil, echild :: Nil) =>
+ val compensation = subsume(rchild, echild, rewrite)
+ val oiter = compensation.map {
+ case comp if comp.eq(rchild) =>
+ session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
+ case _ =>
+ session.sessionState.matcher.execute(subsumer, subsumee, compensation, rewrite)
+ }
+ oiter.flatMap { case iter if iter.hasNext => Some(iter.next)
+ case _ => None }
+
+ case _ => None
+ }
+ } else None
+ }
+
+ // add Select operator as placeholder on top of subsumee to facilitate matching
+ def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
+ subsumee match {
+ case gb @ modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _) =>
+ Some(
+ Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags,
+ gb.flagSpec, Seq.empty))
+ case other => Some(other)
+ }
+ }
+
+ // add Select operator as placeholder on top of subsumer to facilitate matching
+ def unifySubsumer1(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+ (subsumer, subsumee) match {
+ case (r @
+ modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _),
+ modular.Select(_, _, _, _, _,
+ Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+ _, _, _)
+ ) =>
+ modular.Select(
+ r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags,
+ r.flagSpec, Seq.empty).setSkip()
+ case _ => subsumer.setSkip()
+ }
+ }
+
+ def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+ val rtables = subsumer.collect { case n: modular.LeafNode => n }
+ val etables = subsumee.collect { case n: modular.LeafNode => n }
+ val pairs = for {
+ rtable <- rtables
+ etable <- etables
+ if (rtable == etable)
+ } yield (rtable, etable)
+
+ pairs.foldLeft(subsumer) {
+ case (curSubsumer, pair) =>
+ val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 }
+ val attributeSet = AttributeSet(pair._1.output)
+ val rewrites = AttributeMap(pair._1.output.zip(pair._2.output))
+ nxtSubsumer.transformUp {
+ case p => p.transformExpressions {
+ case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
new file mode 100644
index 0000000..5cb5cc6
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * The primary workflow for rewriting relational queries using Spark libraries.
+ */
+class QueryRewrite private (
+ mqoSession: MQOSession,
+ logical: LogicalPlan,
+ nextSubqueryId: AtomicLong) {
+ self =>
+
+ def this(mqoSession: MQOSession, logical: LogicalPlan) =
+ this(mqoSession, logical, new AtomicLong(0))
+
+ def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}"
+
+ lazy val optimizedPlan: LogicalPlan =
+ mqoSession.sessionState.optimizer.execute(logical)
+
+ lazy val modularPlan: ModularPlan =
+ mqoSession.sessionState.modularizer.modularize(optimizedPlan).next().harmonized
+
+ lazy val withSummaryData: ModularPlan =
+ mqoSession.sessionState.navigator.rewriteWithSummaryDatasets(modularPlan, self)
+
+ lazy val toCompactSQL: String = withSummaryData.asCompactSQL
+
+ lazy val toOneLineSQL: String = withSummaryData.asOneLineSQL
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
new file mode 100644
index 0000000..812d09d
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, SimpleModularizer}
+import org.apache.carbondata.mv.plans.util.{BirdcageOptimizer, Signature}
+
+/** Holds a summary logical plan */
+private[mv] case class SummaryDataset(signature: Option[Signature], plan: LogicalPlan)
+
+private[mv] class SummaryDatasetCatalog(saprkContext: SparkContext) {
+
+ @transient
+ private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset]
+
+ @transient
+ private val registerLock = new ReentrantReadWriteLock
+
+ private val optimizer = BirdcageOptimizer
+
+ private val modularizer = SimpleModularizer
+
+ // /** Returns true if the table is currently registered in-catalog. */
+ // def isRegistered(tableName: String): Boolean =
+ // lookupSummaryDataset(sqlContext.table(tableName)).nonEmpty
+ //
+ // /** Registers the specified table in-memory catalog. */
+ // def registerTable(tableName: String): Unit =
+ // registerSummaryDataset(sqlContext.table(tableName), Some(tableName))
+ //
+ // /** Removes the specified table from the in-memory catalog. */
+ // def unregisterTable(tableName: String): Unit =
+ // unregisterSummaryDataset(sqlContext.table(tableName))
+ //
+ // override def unregisterAllTables(): Unit = {
+ // clearSummaryDatasetCatalog()
+ // }
+
+ /** Acquires a read lock on the catalog for the duration of `f`. */
+ private def readLock[A](f: => A): A = {
+ val lock = registerLock.readLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Acquires a write lock on the catalog for the duration of `f`. */
+ private def writeLock[A](f: => A): A = {
+ val lock = registerLock.writeLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Clears all summary tables. */
+ private[mv] def clearSummaryDatasetCatalog(): Unit = {
+ writeLock {
+ summaryDatasets.clear()
+ }
+ }
+
+ /** Checks if the catalog is empty. */
+ private[mv] def isEmpty: Boolean = {
+ readLock {
+ summaryDatasets.isEmpty
+ }
+ }
+
+ /**
+ * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike
+ * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+ * the in-memory columnar representation of the underlying table is expensive.
+ */
+ private[mv] def registerSummaryDataset(
+ query: DataFrame,
+ tableName: Option[String] = None): Unit = {
+ writeLock {
+ val planToRegister = query.queryExecution.analyzed
+ if (lookupSummaryDataset(planToRegister).nonEmpty) {
+ sys.error(s"Asked to register already registered.")
+ } else {
+ val modularPlan = modularizer.modularize(optimizer.execute(planToRegister)).next()
+ .harmonized
+ val signature = modularPlan.signature
+ summaryDatasets += SummaryDataset(signature, planToRegister)
+ }
+ }
+ }
+
+ /** Removes the given [[DataFrame]] from the catalog */
+ private[mv] def unregisterSummaryDataset(query: DataFrame): Unit = {
+ writeLock {
+ val planToRegister = query.queryExecution.analyzed
+ val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan))
+ require(dataIndex >= 0, s"Table $query is not registered.")
+ summaryDatasets.remove(dataIndex)
+ }
+ }
+
+ /**
+ * Tries to remove the data set for the given [[DataFrame]] from the catalog if it's registered
+ */
+ private[mv] def tryUnregisterSummaryDataset(
+ query: DataFrame,
+ blocking: Boolean = true): Boolean = {
+ writeLock {
+ val planToRegister = query.queryExecution.analyzed
+ val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan))
+ val found = dataIndex >= 0
+ if (found) {
+ summaryDatasets.remove(dataIndex)
+ }
+ found
+ }
+ }
+
+ /** Optionally returns registered data set for the given [[DataFrame]] */
+ private[mv] def lookupSummaryDataset(query: DataFrame): Option[SummaryDataset] = {
+ readLock {
+ lookupSummaryDataset(query.queryExecution.analyzed)
+ }
+ }
+
+ /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+ private[mv] def lookupSummaryDataset(plan: LogicalPlan): Option[SummaryDataset] = {
+ readLock {
+ summaryDatasets.find(sd => plan.sameResult(sd.plan))
+ }
+ }
+
+ /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+ private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = {
+ readLock {
+ val sig = plan.signature
+ val feasible = summaryDatasets.filter { x =>
+ (x.signature, sig) match {
+ case (Some(sig1), Some(sig2)) =>
+ if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+ true
+ } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+ true
+ } else {
+ false
+ }
+
+ case _ => false
+ }
+ }
+ // heuristics: more tables involved in a summary data set => higher query reduction factor
+ feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
new file mode 100644
index 0000000..a04c85a
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * Utility functions used by mqo matcher to convert our plan to new aggregation code path
+ */
+private[rewrite] object Utils extends PredicateHelper {
+
+ // use for match qb_2a, qb_2q and sel_3a, sel_3q
+ private def doMatch(
+ operator_a: modular.Matchable,
+ operator_q: modular.Matchable,
+ alias_m: AttributeMap[Alias]): Option[modular.Matchable] = {
+ var matchable = true
+ val matched = operator_q.transformExpressions {
+ case cnt_q@AggregateExpression(Count(exprs_q), _, false, _) =>
+ operator_a.outputList.find {
+ case alias: Alias if alias_m.contains(alias.toAttribute) &&
+ alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+ alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Count] =>
+ // case for groupby
+ val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+ if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
+ false
+ } else {
+ exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+ .forall(p => p._1.semanticEquals(p._2))
+ }
+
+ case attr: Attribute if alias_m.contains(attr) &&
+ alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+ alias_m(attr).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Count] =>
+ val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+ val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+ if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
+ false
+ } else {
+ exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+ .forall(p => p._1.semanticEquals(p._2))
+ }
+
+ case _ => false
+ }.map { cnt => AggregateExpression(
+ Sum(cnt.toAttribute),
+ cnt_q.mode,
+ isDistinct = false,
+ cnt_q.resultId)
+ }.getOrElse { matchable = false; cnt_q }
+
+ case sum_q@AggregateExpression(Sum(expr_q), _, false, _) =>
+ operator_a.outputList.find {
+ case alias: Alias if alias_m.contains(alias.toAttribute) &&
+ alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+ alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Sum] =>
+ val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+ if (sum_a.isDistinct != sum_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+
+ case attr: Attribute if alias_m.contains(attr) &&
+ alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+ alias_m(attr).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Sum] =>
+ val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+ val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+ if (sum_a.isDistinct != sum_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+
+ case _ => false
+ }.map { sum => AggregateExpression(
+ Sum(sum.toAttribute),
+ sum_q.mode,
+ isDistinct = false,
+ sum_q.resultId)
+ }.getOrElse { matchable = false; sum_q }
+
+ case max_q@AggregateExpression(Max(expr_q), _, false, _) =>
+ operator_a.outputList.find {
+ case alias: Alias if alias_m.contains(alias.toAttribute) &&
+ alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+ alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Max] =>
+ val max_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+ if (max_a.isDistinct != max_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+
+ case attr: Attribute if alias_m.contains(attr) &&
+ alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+ alias_m(attr).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Max] =>
+ val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+ val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+ if (max_a.isDistinct != max_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+
+ case _ => false
+ }.map { max => AggregateExpression(
+ Max(max.toAttribute),
+ max_q.mode,
+ isDistinct = false,
+ max_q.resultId)
+ }.getOrElse { matchable = false; max_q }
+
+ case min_q@AggregateExpression(Min(expr_q), _, false, _) =>
+ operator_a.outputList.find {
+ case alias: Alias if alias_m.contains(alias.toAttribute) &&
+ alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+ alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Min] => {
+ val min_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+ val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+ if (min_a.isDistinct != min_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+ }
+ case attr: Attribute if alias_m.contains(attr) &&
+ alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+ alias_m(attr).child.asInstanceOf[AggregateExpression]
+ .aggregateFunction.isInstanceOf[Min] => {
+ val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+ val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+ if (min_a.isDistinct != min_q.isDistinct) {
+ false
+ } else {
+ expr_a.semanticEquals(expr_q)
+ }
+ }
+ case _ => false
+ }.map { min => AggregateExpression(
+ Min(min.toAttribute),
+ min_q.mode,
+ isDistinct = false,
+ min_q.resultId)
+ }.getOrElse { matchable = false; min_q }
+
+ case other: AggregateExpression =>
+ matchable = false
+ other
+
+ case expr: Expression if !expr.isInstanceOf[AggregateFunction] =>
+ operator_a.outputList.find {
+ case alias: Alias if alias_m.contains(alias.toAttribute) &&
+ alias_m(alias.toAttribute).child.semanticEquals(expr) &&
+ !alias_m(alias.toAttribute).child
+ .isInstanceOf[AggregateExpression] => true
+ case attr: Attribute if alias_m.contains(attr) &&
+ alias_m(attr).child.semanticEquals(expr) &&
+ !alias_m(attr).child.isInstanceOf[AggregateExpression] => true
+ case _ => false
+ }.map(_.toAttribute)
+ .getOrElse { expr }
+ }
+
+ if (matchable) {
+ Some(matched)
+ } else {
+ None
+ }
+ }
+
+ def tryMatch(a: modular.Matchable,
+ q: modular.Matchable,
+ m: AttributeMap[Alias]): Option[modular.Matchable] = {
+ if (a.getClass == q.getClass) {
+ doMatch(a, q, m)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * (Subsumee) expression translation:
+ *
+ * The translation begins by creating a copy of the whole expression (step 1). Then each input
+ * column is translated in turn.
+ * To translate an input column, we first find the child block that produces the input column
+ * and replace the input column with the
+ * associated output column expression (step 2). The next step is to replace the translated
+ * expression with its equivalent output
+ * expression at the top of the child compensation (step 3). Then, We recursively translate
+ * each new input column(except input
+ * columns produced by rejoin children) until we reach the bottom of the child compensation
+ * (step 4). Finally, we find an
+ * equivalent output expression in subsumer (step 5).
+ *
+ * So given a subsumee expr, the translation follows the following path:
+ *
+ * top of subsumee --> child of subsumee --> top of compensation --> bottom of compensation -->
+ * top of subsumer
+ *
+ * To simplify this we assume in subsumer outputList of top select 1-1 corresponds to the
+ * outputList of groupby
+ * note that subsumer outputList is list of attributes and that of groupby is list of aliases
+ *
+ */
+ private def doTopSelectTranslation(exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]): Option[Expression] = {
+ (subsumer, subsumee, compensation) match {
+ // top selects whose children do not match exactly
+ // for simplicity, we assume outputList of subsumer is 1-1 corresponding to that of its
+ // immediately groupby child
+ case (
+ sel_3a@modular.Select(
+ _, _, _, _, _,
+ Seq(gb_2a@modular.GroupBy(
+ _, _, _, _, sel_2a@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+ _, _, _),
+ sel_3q@modular.Select(
+ _, _, _, _, _, Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+ Some(gb_2c@modular.GroupBy(
+ _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _), _, _))
+ ) =>
+ if (sel_3q.predicateList.contains(exprE)) {
+ val expr1E = exprE.transform {
+ case attr: Attribute =>
+ gb_2c.outputList.lift(
+ gb_2q.outputList.indexWhere {
+ case alias: Alias if alias.toAttribute.semanticEquals(attr) => true;
+ case other => false
+ }).getOrElse { attr }
+ }
+ if (expr1E.eq(exprE)) {
+ None
+ } else {
+ Some(expr1E)
+ }
+ }
+ else if (sel_3q.outputList.contains(exprE)) {
+ exprE match {
+ case attr: Attribute => // this subexpression must in subsumee select output list
+ gb_2c.outputList.lift(
+ gb_2q.outputList.indexWhere {
+ case a if a.toAttribute.semanticEquals(attr) => true;
+ case other => false
+ })
+
+ case alias: Alias =>
+ gb_2c.outputList.lift(
+ gb_2q.outputList.indexWhere {
+ case a if a.toAttribute.semanticEquals(alias.toAttribute) => true;
+ case other => false
+ })
+
+ case _ => None
+ }
+ } else if (sel_2c.predicateList.contains(exprE)) {
+ if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+ canEvaluate(exprE, subsumer)) {
+ Some(exprE)
+ } else {
+ None
+ }
+ } else if (gb_2c.predicateList.contains(exprE)) {
+ if (gb_2a.outputList.exists {
+ case a: Alias if a.child.semanticEquals(exprE) => true;
+ case _ => false
+ } || canEvaluate(exprE, subsumer)) {
+ Some(exprE)
+ } else {
+ None
+ }
+ } else if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+ canEvaluate(exprE, subsumer)) {
+ Some(exprE)
+ } else {
+ None
+ }
+
+ case _ => None // TODO: implement this
+ }
+ }
+
+ private def isSemanticEquivalent(translatedExpr: Expression, subsumer: ModularPlan) = {
+ subsumer match {
+ // if subsumer has where clause, even if expr can be translated into new expr based on
+ // subsumer, the two may not be semantic equivalent
+ // TODO: refine this
+ case modular.Select(
+ _, _, predicateList, _, _,
+ Seq(modular.GroupBy(_, _, _, _, _, _, _)), _, _, _)
+ if predicateList.nonEmpty => false
+ case _ => true
+ }
+ }
+
+ /**
+ * derivable = translatable + semantic equivalent
+ *
+ * The translation method described above is also the first step in deriving a subsumee
+ * expression Eexp from the subsumer's output columns. After translating
+ * Eexp to E'exp, deriavability can be established by making sure that the subsumer
+ * computes at its output certain necessary subexpressions of E'exp (or even the entire
+ * E'exp). The problem that arises, however, is to determine the parts of E'exp that
+ * can/should be computed by the subsumer.
+ *
+ * In general, translation causes an expression to expand by replacing individual input
+ * columns with equivalent subexpressions. Derivation is the reverse operation, where
+ * pieces of the translated expression are collapsed as they are computed along the
+ * derivation path.
+ */
+
+ def isDerivable(exprE: Expression,
+ exprListR: Seq[Expression],
+ subsumee: ModularPlan,
+ subsumer: ModularPlan,
+ compensation: Option[ModularPlan]): Boolean = {
+ val exprE1 = doTopSelectTranslation(exprE, exprListR, subsumee, subsumer, compensation)
+ exprE1 match {
+ case Some(e) => isSemanticEquivalent(e, subsumer)
+ case _ => false
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
new file mode 100644
index 0000000..0ee2475
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.test.util.PlanTest
+
+class SelectSelectExactChildrenSuite extends PlanTest {
+
+ object Match extends DefaultMatchMaker {
+ val patterns = SelectSelectNoChildDelta :: Nil
+ }
+
+ val testRelation1 = LocalRelation('tid.int,'fpgid.int,'flid.int,'date.timestamp,'faid.int,'price.double,'qty.int,'disc.string)
+ val testRelation2 = LocalRelation('lid.int,'city.string,'state.string,'country.string)
+
+// test("pro-typical lower select") {
+// val fact = testRelation1.subquery('fact)
+// val dim = testRelation2.subquery('dim)
+//
+// val lowerSTSelect =
+// fact
+// .select('faid,'flid,Year('date) as 'year)
+// .analyze
+// val lowerUQSelect =
+// fact.join(dim)
+// .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA")
+// .select('faid,'flid,Year('date) as 'year, 'state)
+// .analyze
+//
+// val matched = Match.execute(lowerSTSelect.model,lowerUQSelect.model,None).next
+//
+// val correctAnswer =
+// lowerSTSelect.join(dim)
+// .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA")
+// .select('faid,'flid,Year('date) as 'year, 'state)
+// .analyze.model
+//
+// comparePlans(matched, correctAnswer)
+// }
+
+// val testSummaryDataset =
+// s"""
+// |SELECT faid, flid, year_proj(date) as year, count(*) as cnt
+// |FROM Fact
+// |GROUP BY faid, flid, year_proj(date)
+// """.stripMargin.trim
+//
+// val testUserQuery =
+// s"""
+// |SELECT faid, state, year_proj(date) as year, count(*) as cnt
+// |FROM Fact
+// | INNER JOIN Dim
+// | ON Fact.flid = Dim.lid AND Dim.country = "USA"
+// |GROUP BY Fact.faid,Fact.state,year_proj(Fact.date)
+// |HAVING count(*) > 2
+// """.stripMargin.trim
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
new file mode 100644
index 0000000..86b9dce
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.util.PlanTest
+import org.scalatest.BeforeAndAfter
+
+import org.apache.carbondata.mv.MQOSession
+
+class TestSQLSuite extends PlanTest with BeforeAndAfter {
+
+ import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._
+
+ val spark = SparkSession.builder().master("local").enableHiveSupport().getOrCreate()
+ val testHive = new org.apache.spark.sql.hive.test.TestHiveContext(spark.sparkContext, false)
+ val hiveClient = testHive.sparkSession.metadataHive
+
+ test("protypical mqo rewrite test") {
+
+ hiveClient.runSqlHive(
+ s"""
+ |CREATE TABLE Fact (
+ | `tid` int,
+ | `fpgid` int,
+ | `flid` int,
+ | `date` timestamp,
+ | `faid` int,
+ | `price` double,
+ | `qty` int,
+ | `disc` string
+ |)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |STORED AS TEXTFILE
+ """.stripMargin.trim
+ )
+
+ hiveClient.runSqlHive(
+ s"""
+ |CREATE TABLE Dim (
+ | `lid` int,
+ | `city` string,
+ | `state` string,
+ | `country` string
+ |)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |STORED AS TEXTFILE
+ """.stripMargin.trim
+ )
+
+ hiveClient.runSqlHive(
+ s"""
+ |CREATE TABLE Item (
+ | `i_item_id` int,
+ | `i_item_sk` int
+ |)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |STORED AS TEXTFILE
+ """.stripMargin.trim
+ )
+
+ val dest = "case_10"
+
+ sampleTestCases.foreach { testcase =>
+ if (testcase._1 == dest) {
+ val mqoSession = new MQOSession(testHive.sparkSession)
+ val summary = testHive.sparkSession.sql(testcase._2)
+ mqoSession.sharedState.registerSummaryDataset(summary)
+ val rewrittenSQL = mqoSession.rewrite(testcase._3).toCompactSQL.trim
+
+ if (!rewrittenSQL.equals(testcase._4)) {
+ println(
+ s"""
+ |=== FAIL: SQLs do not match ===
+ |${sideBySide(rewrittenSQL, testcase._4).mkString("\n")}
+ """.stripMargin)
+ }
+ }
+
+ }
+ }
+ testHive.sparkSession.cloneSession()
+}
\ No newline at end of file