You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2020/02/04 12:55:02 UTC

[carbondata] branch master updated: [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries

This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acaf08  [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries
8acaf08 is described below

commit 8acaf08b7494dfd9652380cb86d70eeb76738484
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Tue Dec 3 11:19:57 2019 +0530

    [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries
    
    Supported Query RollUp for MV TimeSeries queries.
    How it is supported?
    For each timeseries query, check if query can be rolled up from the existing datasets, by replacing the granularity in given user query. If query is rewritten and rolledUp, then add Select and Group by nodes on the rewritten query.
    
    This closes #3495
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 160 +++++++++++-
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  |  19 +-
 .../apache/carbondata/mv/rewrite/Navigator.scala   | 181 ++++++++++++-
 .../timeseries/TestMVTimeSeriesQueryRollUp.scala   | 279 +++++++++++++++++++++
 .../carbondata/mv/plans/modular/ModularPlan.scala  |  18 ++
 .../org/apache/carbondata/mv/plans/package.scala   |  32 ++-
 .../apache/carbondata/mv/plans/util/Printers.scala |  56 ++++-
 docs/datamap/mv-datamap-guide.md                   |  35 ++-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |   8 +
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |   8 +
 10 files changed, 779 insertions(+), 17 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 12cad37..495ad5b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction, Ti
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -830,18 +831,173 @@ object MVHelper {
    */
   def rewriteWithMVTable(rewrittenPlan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
     if (rewrittenPlan.find(_.rewritten).isDefined) {
-      val updatedDataMapTablePlan = rewrittenPlan transform {
+      var updatedMVTablePlan = rewrittenPlan transform {
         case s: Select =>
           MVHelper.updateDataMap(s, rewrite)
         case g: GroupBy =>
           MVHelper.updateDataMap(g, rewrite)
       }
-      updatedDataMapTablePlan
+      if (rewrittenPlan.isRolledUp) {
+        // If the rewritten query is rolled up, then rewrite the query based on the original modular
+        // plan. Make a new outputList based on original modular plan and wrap rewritten plan with
+        // select & group-by nodes with new outputList.
+
+        // For example:
+        // Given User query:
+        // SELECT timeseries(col,'day') from maintable group by timeseries(col,'day')
+        // If plan is rewritten as per 'hour' granularity of datamap1,
+        // then rewritten query will be like,
+        // SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
+        // (projectjoindate, hour)`
+        // FROM
+        // default.datamap1_table
+        // GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`
+        //
+        // Now, rewrite the rewritten plan as per the 'day' granularity
+        // SELECT timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' ) AS
+        // `UDF:timeseries(projectjoindate, day)`
+        //  FROM
+        //  (SELECT datamap2_table.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
+        //  (projectjoindate, hour)`
+        //  FROM
+        //    default.datamap2_table
+        //  GROUP BY datamap2_table.`UDF:timeseries_projectjoindate_hour`) gen_subsumer_0
+        // GROUP BY timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' )
+        rewrite.modularPlan match {
+          case select: Select =>
+            val outputList = select.outputList
+            val rolledUpOutputList = updatedMVTablePlan.asInstanceOf[Select].outputList
+            var finalOutputList: Seq[NamedExpression] = Seq.empty
+            val mapping = outputList zip rolledUpOutputList
+            val newSubsume = rewrite.newSubsumerName()
+
+            mapping.foreach { outputLists =>
+              val name: String = getAliasName(outputLists._2)
+              outputLists._1 match {
+                case a@Alias(scalaUdf: ScalaUDF, aliasName) =>
+                  if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
+                    val newName = newSubsume + ".`" + name + "`"
+                    val transformedUdf = transformTimeSeriesUdf(scalaUdf, newName)
+                    finalOutputList = finalOutputList.:+(Alias(transformedUdf, aliasName)(a.exprId,
+                      a.qualifier).asInstanceOf[NamedExpression])
+                  }
+                case Alias(attr: AttributeReference, _) =>
+                  finalOutputList = finalOutputList.:+(
+                    CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume))
+                case attr: AttributeReference =>
+                  finalOutputList = finalOutputList.:+(
+                    CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume))
+              }
+            }
+            val newChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+            val newAliasMap = new collection.mutable.HashMap[Int, String]()
+
+            val sel_plan = select.copy(outputList = finalOutputList,
+              inputList = finalOutputList,
+              predicateList = Seq.empty)
+            newChildren += sel_plan
+            newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
+            updatedMVTablePlan = select.copy(outputList = finalOutputList,
+              inputList = finalOutputList,
+              aliasMap = newAliasMap.toMap,
+              predicateList = Seq.empty,
+              children = Seq(updatedMVTablePlan)).setRewritten()
+
+          case groupBy: GroupBy =>
+            updatedMVTablePlan match {
+              case select: Select =>
+                val selectOutputList = groupBy.outputList
+                val rolledUpOutputList = updatedMVTablePlan.asInstanceOf[Select].outputList
+                var finalOutputList: Seq[NamedExpression] = Seq.empty
+                var predicateList: Seq[Expression] = Seq.empty
+                val mapping = selectOutputList zip rolledUpOutputList
+                val newSubsume = rewrite.newSubsumerName()
+
+                mapping.foreach { outputLists =>
+                  val aliasName: String = getAliasName(outputLists._2)
+                  outputLists._1 match {
+                    case a@Alias(scalaUdf: ScalaUDF, _) =>
+                      if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
+                        val newName = newSubsume + ".`" + aliasName + "`"
+                        val transformedUdf = transformTimeSeriesUdf(scalaUdf, newName)
+                        groupBy.predicateList.foreach {
+                          case udf: ScalaUDF if udf.isInstanceOf[ScalaUDF] =>
+                            predicateList = predicateList.:+(transformedUdf)
+                          case attr: AttributeReference =>
+                            predicateList = predicateList.:+(
+                              CarbonToSparkAdapter.createAttributeReference(attr,
+                                attr.name,
+                                newSubsume))
+                        }
+                        finalOutputList = finalOutputList.:+(Alias(transformedUdf, a.name)(a
+                          .exprId, a.qualifier).asInstanceOf[NamedExpression])
+                      }
+                    case attr: AttributeReference =>
+                      finalOutputList = finalOutputList.:+(
+                        CarbonToSparkAdapter.createAttributeReference(attr, aliasName, newSubsume))
+                    case Alias(attr: AttributeReference, _) =>
+                      finalOutputList = finalOutputList.:+(
+                        CarbonToSparkAdapter.createAttributeReference(attr, aliasName, newSubsume))
+                    case a@Alias(agg: AggregateExpression, name) =>
+                      val newAgg = agg.transform {
+                        case attr: AttributeReference =>
+                          CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume)
+                      }
+                      finalOutputList = finalOutputList.:+(Alias(newAgg, name)(a.exprId,
+                        a.qualifier).asInstanceOf[NamedExpression])
+                    case other => other
+                  }
+                }
+                val newChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+                val newAliasMap = new collection.mutable.HashMap[Int, String]()
+
+                val sel_plan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  predicateList = Seq.empty)
+                newChildren += sel_plan
+                newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
+                updatedMVTablePlan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  aliasMap = newAliasMap.toMap,
+                  children = Seq(updatedMVTablePlan)).setRewritten()
+                updatedMVTablePlan = groupBy.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  predicateList = predicateList,
+                  alias = Some(newAliasMap.mkString),
+                  child = updatedMVTablePlan).setRewritten()
+                updatedMVTablePlan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  children = Seq(updatedMVTablePlan)).setRewritten()
+            }
+        }
+      }
+      updatedMVTablePlan
     } else {
       rewrittenPlan
     }
   }
 
+  def getAliasName(exp: NamedExpression): String = {
+    exp match {
+      case Alias(_, name) =>
+        name
+      case attr: AttributeReference =>
+        attr.name
+    }
+  }
+
+  private def transformTimeSeriesUdf(scalaUdf: ScalaUDF, newName: String): Expression = {
+    scalaUdf.transformDown {
+      case attr: AttributeReference =>
+        AttributeReference(newName, attr.dataType)(
+          exprId = attr.exprId,
+          qualifier = attr.qualifier)
+      case l: Literal =>
+        Literal(UTF8String.fromString("'" + l.toString() + "'"),
+          org.apache.spark.sql.types.DataTypes.StringType)
+    }
+  }
+
   private def getUpdatedOutputList(outputList: Seq[NamedExpression],
       dataMapTableRelation: Option[ModularPlan]): Seq[NamedExpression] = {
     dataMapTableRelation.collect {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 06dd7a1..50d6ae7 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -544,14 +544,19 @@ object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with Predi
    */
   private def isDerivableForUDF(exprE: Expression, exprListR: Seq[Expression]): Boolean = {
     var canBeDerived = false
-    exprListR.forall {
-      case a: ScalaUDF =>
-        a.references.foreach { a =>
-          canBeDerived = exprE.sql.contains(a.name)
-        }
-        canBeDerived
+    exprE match {
+      case f: ScalaUDF =>
+        canEvaluate(f, exprListR)
       case _ =>
-        canBeDerived
+        exprListR.forall {
+          case a: ScalaUDF =>
+            a.references.foreach { a =>
+              canBeDerived = exprE.sql.contains(a.name)
+            }
+            canBeDerived
+          case _ =>
+            canBeDerived
+        }
     }
   }
 
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index 54a684e..b4a0ae1 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -17,8 +17,19 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
 
+import org.apache.spark.sql.CarbonToSparkAdapter
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Expression, Literal, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.metadata.schema.datamap.Granularity
+import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
 import org.apache.carbondata.mv.datamap.MVUtil
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans.modular
@@ -27,6 +38,8 @@ import org.apache.carbondata.mv.session.MVSession
 
 private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) {
 
+  var modularPlan: java.util.Set[ModularPlan] = new java.util.HashSet[ModularPlan]()
+
   def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
     val replaced = plan.transformAllExpressions {
       case s: ModularSubquery =>
@@ -66,12 +79,120 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
         }
     }
     if (rewrittenPlan.fastEquals(plan)) {
-      None
+      if (modularPlan.asScala.exists(p => p.sameResult(rewrittenPlan))) {
+        return None
+      }
+      getRolledUpModularPlan(rewrittenPlan, rewrite)
     } else {
       Some(rewrittenPlan)
     }
   }
 
+  /**
+   * Check if modular plan can be rolled up by rewriting and matching the modular plan
+   * with existing mv datasets.
+   * @param rewrittenPlan to check if can be rolled up
+   * @param rewrite
+   * @return new modular plan
+   */
+  def getRolledUpModularPlan(rewrittenPlan: ModularPlan,
+      rewrite: QueryRewrite): Option[ModularPlan] = {
+    var canDoRollUp = true
+    // check if modular plan contains timeseries udf
+    val timeSeriesUdf = rewrittenPlan match {
+      case s: Select =>
+        getTimeSeriesUdf(s.outputList)
+      case g: GroupBy =>
+        getTimeSeriesUdf(g.outputList)
+      case _ => (null, null)
+    }
+    // set canDoRollUp to false, in case of Join queries and if filter has timeseries udf function
+    // TODO: support rollUp for join queries
+    rewrite.optimizedPlan.transformDown {
+      case join@Join(_, _, _, _) =>
+        canDoRollUp = false
+        join
+      case f@Filter(condition: Expression, _) =>
+        condition.collect {
+          case s: ScalaUDF if s.function.isInstanceOf[TimeSeriesFunction] =>
+            canDoRollUp = false
+        }
+        f
+    }
+    if (null != timeSeriesUdf._2 && canDoRollUp) {
+      // check for rollup and rewrite the plan
+      // collect all the datasets which contains timeseries datamap's
+      val dataSets = catalog.lookupFeasibleSummaryDatasets(rewrittenPlan)
+      var granularity: java.util.List[TimeSeriesFunctionEnum] = new java.util
+      .ArrayList[TimeSeriesFunctionEnum]()
+      // Get all the lower granularities for the query from datasets
+      dataSets.foreach { dataset =>
+        if (dataset.dataMapSchema.isTimeSeries) {
+          dataset.plan.transformExpressions {
+            case a@Alias(udf: ScalaUDF, _) =>
+              if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+                val gran = udf.children.last.asInstanceOf[Literal].toString().toUpperCase()
+                if (Granularity.valueOf(timeSeriesUdf._2.toUpperCase).ordinal() <=
+                    Granularity.valueOf(gran).ordinal()) {
+                  granularity.add(TimeSeriesFunctionEnum.valueOf(gran))
+                }
+              }
+              a
+          }
+        }
+      }
+      if (!granularity.isEmpty) {
+        granularity = granularity.asScala.sortBy(_.getOrdinal)(Ordering[Int].reverse).asJava
+        var orgTable: String = null
+        // get query Table
+        rewrittenPlan.collect {
+          case m: ModularRelation =>
+            orgTable = m.tableName
+        }
+        var queryGranularity: String = null
+        var newRewrittenPlan = rewrittenPlan
+        // replace granularities in the plan and check if plan can be changed
+        breakable {
+          granularity.asScala.foreach { func =>
+            newRewrittenPlan = rewriteGranularityInPlan(rewrittenPlan, func.getName)
+            modularPlan.add(newRewrittenPlan)
+            val logicalPlan = session
+              .sparkSession
+              .sql(newRewrittenPlan.asCompactSQL)
+              .queryExecution
+              .optimizedPlan
+            modularPlan.clear()
+            var rolledUpTable: String = null
+            logicalPlan.collect {
+              case l: LogicalRelation =>
+                rolledUpTable = l.catalogTable.get.identifier.table
+            }
+            if (!rolledUpTable.equalsIgnoreCase(orgTable)) {
+              queryGranularity = func.getName
+              break()
+            }
+          }
+        }
+        if (null != queryGranularity) {
+          // rewrite the plan and set it as rolled up plan
+          val modifiedPlan = rewriteWithSummaryDatasetsCore(newRewrittenPlan, rewrite)
+          if (modifiedPlan.isDefined) {
+            modifiedPlan.get.map(_.setRolledUp())
+            modifiedPlan
+          } else {
+            None
+          }
+        } else {
+          None
+        }
+      } else {
+        None
+      }
+    } else {
+      None
+    }
+  }
+
   def subsume(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -206,4 +327,60 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
     }
     true
   }
+
+  /**
+   * Replace the identified immediate lower level granularity in the modular plan
+   * to perform rollup
+   *
+   * @param plan             to be re-written
+   * @param queryGranularity to be replaced
+   * @return plan with granularity changed
+   */
+  private def rewriteGranularityInPlan(plan: ModularPlan, queryGranularity: String) = {
+    val newPlan = plan.transformDown {
+      case p => p.transformAllExpressions {
+        case udf: ScalaUDF =>
+          if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+            val transformedUdf = udf.transformDown {
+              case _: Literal =>
+                new Literal(UTF8String.fromString(queryGranularity.toLowerCase),
+                  DataTypes.StringType)
+            }
+            transformedUdf
+          } else {
+            udf
+          }
+        case alias@Alias(udf: ScalaUDF, name) =>
+          if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+            var literal: String = null
+            val transformedUdf = udf.transformDown {
+              case l: Literal =>
+                literal = l.value.toString
+                new Literal(UTF8String.fromString(queryGranularity.toLowerCase),
+                  DataTypes.StringType)
+            }
+            Alias(transformedUdf,
+              name.replace(", " + literal, ", " + queryGranularity))(alias.exprId,
+              alias.qualifier).asInstanceOf[NamedExpression]
+          } else {
+            alias
+          }
+      }
+    }
+    newPlan
+  }
+
+  def getTimeSeriesUdf(outputList: scala.Seq[NamedExpression]): (String, String) = {
+    outputList.collect {
+      case Alias(udf: ScalaUDF, name) =>
+        if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+          udf.children.collect {
+            case l: Literal =>
+              return (name, l.value.toString)
+          }
+        }
+    }
+    (null, null)
+  }
+
 }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
new file mode 100644
index 0000000..d9c17b3
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
@@ -0,0 +1,279 @@
+  /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+  package org.apache.carbondata.mv.timeseries
+
+  import org.apache.spark.sql.test.util.QueryTest
+  import org.scalatest.BeforeAndAfterAll
+
+  import org.apache.carbondata.mv.rewrite.TestUtil
+
+  class TestMVTimeSeriesQueryRollUp extends QueryTest with BeforeAndAfterAll {
+
+    override def beforeAll(): Unit = {
+      drop()
+      createTable()
+      loadData("maintable")
+    }
+
+    override def afterAll(): Unit = {
+      drop()
+    }
+
+    test("test timeseries query rollup with simple projection") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - scenario-1") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable group by timeseries(projectjoindate,'second'),projectcode")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable group by timeseries(projectjoindate,'hour'),projectcode")
+      var df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      df = sql("select timeseries(projectjoindate,'second'),projectcode from maintable group by timeseries(projectjoindate,'second'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      df = sql("select timeseries(projectjoindate,'second') from maintable group by timeseries(projectjoindate,'second')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - scenario-1 with single datamap") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable group by timeseries(projectjoindate,'second'),projectcode")
+      var df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      checkAnswer(result,df)
+      df = sql("select timeseries(projectjoindate,'second'),projectcode from maintable group by timeseries(projectjoindate,'second'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      df = sql("select timeseries(projectjoindate,'second') from maintable group by timeseries(projectjoindate,'second')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      sql("drop datamap if exists datamap1")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - scenario-2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')")
+      val df =sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with filter") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by & filter - scenario 1") {
+      val result = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8 " +
+                       "group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql("create datamap datamap1 on table maintable using 'mv' as " +
+          "select timeseries(projectjoindate,'second'),projectcode from maintable group by " +
+          "timeseries(projectjoindate,'second'),projectcode")
+      sql("create datamap datamap2 on table maintable using 'mv' as " +
+          "select timeseries(projectjoindate,'hour'),projectcode from maintable group by timeseries" +
+          "(projectjoindate,'hour'),projectcode")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8 " +
+                   "group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result, df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by & filter - scenario 2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8 group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable where projectcode=1 group by timeseries(projectjoindate,'second'),projectcode")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable where projectcode=8 group by timeseries(projectjoindate,'day'),projectcode")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap1"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+    }
+
+    test("test timeseries query rollup with simple projection with alias- scenario 1") {
+      val result  = sql("select timeseries(projectjoindate,'day') as a,projectcode as b from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day') as a,projectcode as b from maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with alias- scenario 2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second') as a,projectcode as b from maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour') as a,projectcode as b from maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with projection with alias and group by- scenario 1") {
+      val result  = sql("select timeseries(projectjoindate,'day') as a,sum(projectcode) as b from maintable group by timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day') as a,sum(projectcode) as b from maintable group by timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with projection with alias and group by- scenario 2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second') as a,sum(projectcode) as b from maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour') as a,sum(projectcode) as b from maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("rollup not supported for join queries") {
+      sql("drop table if exists maintable1")
+      sql("CREATE TABLE maintable1 (empno int,empname string, projectcode int, projectjoindate " +
+        "Timestamp,salary double) STORED AS CARBONDATA")
+      loadData("maintable1")
+      val result = sql("select timeseries(t1.projectjoindate,'day'),count(timeseries(t2.projectjoindate,'day')),sum(t2.projectcode) from maintable t1 inner join maintable1 t2 " +
+          "on (timeseries(t1.projectjoindate,'day')=timeseries(t2.projectjoindate,'day')) group by timeseries(t1.projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql("create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(t1.projectjoindate,'second'),count(timeseries(t2.projectjoindate,'second')),sum(t2.projectcode) from maintable t1 inner join maintable1 t2 " +
+        "on (timeseries(t1.projectjoindate,'second')=timeseries(t2.projectjoindate,'second')) group by timeseries(t1.projectjoindate,'second')")
+      sql("create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(t1.projectjoindate,'hour'),count(timeseries(t2.projectjoindate,'hour')),sum(t2.projectcode) from maintable t1 inner join maintable1 t2 " +
+        "on (timeseries(t1.projectjoindate,'hour')=timeseries(t2.projectjoindate,'hour')) group by timeseries(t1.projectjoindate,'hour')")
+      val df = sql("select timeseries(t1.projectjoindate,'day'),count(timeseries(t2.projectjoindate,'day')),sum(t2.projectcode) from maintable t1 inner join maintable1 t2 " +
+          "on (timeseries(t1.projectjoindate,'day')=timeseries(t2.projectjoindate,'day')) group by timeseries(t1.projectjoindate,'day')")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("rollup not supported for timeseries udf in filter") {
+      val result  = sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable where timeseries(projectjoindate,'day')='2016-02-23 00:00:00' group by timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable where timeseries(projectjoindate,'day')='2016-02-23 00:00:00' group by timeseries(projectjoindate,'day')")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    def drop(): Unit = {
+      sql("drop table if exists maintable")
+    }
+
+    def createTable(): Unit = {
+      sql(
+        "CREATE TABLE maintable (empno int,empname string, projectcode int, projectjoindate " +
+        "Timestamp,salary double) STORED AS CARBONDATA")
+    }
+
+    def loadData(table: String): Unit = {
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/mv_sampledata.csv' INTO TABLE $table  OPTIONS
+           |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    }
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index 55d3c5c..246ca0d 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -96,6 +96,24 @@ abstract class ModularPlan
     _rewritten
   }
 
+  private var rolledUp: Boolean = false
+
+  /**
+   * Marks this plan as rolledup plan
+   */
+  private[mv] def setRolledUp(): ModularPlan = {
+    rolledUp = true
+    children.foreach(_.setRolledUp())
+    this
+  }
+
+  /**
+   * Returns true if plan is rolledup
+   */
+  def isRolledUp: Boolean = {
+    rolledUp
+  }
+
   private var _skip: Boolean = false
 
   private[mv] def setSkip(): ModularPlan = {
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
index 8c799fe..e85c238 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
@@ -18,7 +18,7 @@
 package org.apache.carbondata.mv
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper, ScalaUDF}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.carbondata.mv.plans.modular.ModularPlan
@@ -42,8 +42,36 @@ package object plans {
       expr.references.subsetOf(plan.outputSet)
     }
 
+
+    /**
+     * If exp is a ScalaUDF, then for each of it's children, we have to check if
+     * children can be derived from another scala UDF children from exprList
+     * @param exp scalaUDF
+     * @param exprList predicate and rejoin output list
+     * @return if udf can be derived from another udf
+     */
+    def canEvaluate(exp: ScalaUDF, exprList: Seq[Expression]): Boolean = {
+      var canBeDerived = false
+      exprList.forall {
+        case udf: ScalaUDF =>
+          if (udf.children.length == exp.children.length) {
+            if (udf.children.zip(exp.children).forall(e => e._1.sql.equalsIgnoreCase(e._2.sql))) {
+              canBeDerived = true
+            }
+          }
+          canBeDerived
+        case _ =>
+          canBeDerived
+      }
+    }
+
     def canEvaluate(expr: Expression, exprList: Seq[Expression]): Boolean = {
-      expr.references.subsetOf(AttributeSet(exprList))
+      expr match {
+        case exp: ScalaUDF =>
+          canEvaluate(exp, exprList)
+        case _ =>
+          expr.references.subsetOf(AttributeSet(exprList))
+      }
     }
   }
 
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index 366284b..9112005 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -106,9 +106,9 @@ trait Printers {
     def printSelect(select: Seq[NamedExpression], flags: FlagSet): Unit = {
       if (flags.hasFlag(DISTINCT)) {
         print("SELECT DISTINCT %s "
-          .format(select.map(_.sql) mkString ", "))
+          .format(formatUDF(select)))
       } else {
-        print("SELECT %s ".format(select.map(_.sql) mkString ", "))
+        print("SELECT %s ".format(formatUDF(select)))
       }
     }
 
@@ -153,7 +153,7 @@ trait Printers {
     def printGroupby(groupby: (Seq[Expression], Seq[Seq[Expression]])): Unit = {
 
       if (groupby._1.nonEmpty) {
-        print("GROUP BY %s".format(groupby._1.map(_.sql).mkString(", ")))
+        print("GROUP BY %s".format(formatUDFinExpression(groupby._1)))
         if (groupby._2.nonEmpty) {
           print(" GROUPING SETS(%s)"
             .format(groupby._2.map(e => s"(${ e.map(_.sql).mkString(", ") })").mkString(", ")))
@@ -343,5 +343,55 @@ trait Printers {
     newSQLFragmentOneLinePrinter(
       new PrintWriter(stream))
   }
+
+  def formatUDF(select: Seq[NamedExpression]): String = {
+    val result = select.map {
+      // if scala UDF, then don't do direct sql
+      case a@Alias(child: ScalaUDF, _) =>
+        val udf = child.udfName
+        if (udf.isDefined) {
+          udf.get + "(" + (formatExpressionsInUDF(child.children)) + " ) AS `" + a.name + "`"
+        } else {
+          child.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ", "
+  }
+
+  def formatExpressionsInUDF(exp: Seq[Expression]): String = {
+    val result = exp.map {
+      case attr: AttributeReference =>
+        if (attr.name.startsWith("gen_subsumer_")) {
+          attr.name
+        } else {
+          attr.sql
+        }
+      case literal: Literal =>
+        if (literal.value.toString.startsWith("`") || literal.value.toString.startsWith("'")) {
+          literal.value.toString
+        } else {
+          literal.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ","
+  }
+
+  def formatUDFinExpression(select: Seq[Expression]): String = {
+    val result = select.map {
+      case udf: ScalaUDF if udf.isInstanceOf[ScalaUDF] =>
+        if (udf.udfName.isDefined) {
+          udf.udfName.get + "(" + formatExpressionsInUDF(udf.children) + " )"
+        } else {
+          udf.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ", "
+  }
 }
 // scalastyle:on println
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index 1e5a0bc..dd218ee 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -24,6 +24,7 @@
 * [Compaction](#compacting-mv-datamap)
 * [Data Management](#data-management-with-mv-tables)
 * [MV TimeSeries Support](#mv-timeseries-support)
+* [MV TimeSeries RollUp Support](#mv-timeseries-rollup-support)
 
 ## Quick example
 
@@ -235,4 +236,36 @@ Timeseries queries with Date type support's only year, month, day and week granu
 
  **NOTE**:
  1. Single select statement cannot contain timeseries udf(s) neither with different granularity nor
- with different timestamp/date columns.
\ No newline at end of file
+ with different timestamp/date columns.
+ 
+ ## MV TimeSeries RollUp Support
+  MV Timeseries queries can be rolledUp from existing mv datamaps.
+  ### Query RollUp
+ Consider an example where the query is on hour level granularity, but the datamap
+ of hour is not present but  minute level datamap is present, then we can get the data
+ from minute level and the aggregate the hour level data and give output.
+ This is called query rollup.
+ 
+ Consider if user create's below timeseries datamap,
+   ```
+   CREATE DATAMAP agg_sales
+   ON TABLE sales
+   USING "MV"
+   AS
+     SELECT timeseries(order_time,'minute'),avg(price)
+     FROM sales
+     GROUP BY timeseries(order_time,'minute')
+   ```
+ and fires the below query with hour level granularity.
+   ```
+    SELECT timeseries(order_time,'hour'),avg(price)
+    FROM sales
+    GROUP BY timeseries(order_time,'hour')
+   ```
+ Then, the above query can be rolled up from 'agg_sales' mv datamap, by adding hour
+ level timeseries aggregation on minute level datamap. Users can fire explain command
+ to check if query is rolled up from existing mv datamaps.
+ 
+  **NOTE**:
+  1. Queries cannot be rolled up, if filter contains timeseries function.
+  2. RollUp is not yet supported for queries having join clause or order by functions.
\ No newline at end of file
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 6182a94..3135b99 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -52,6 +52,14 @@ object CarbonToSparkAdapter {
       metadata)(exprId, qualifier)
   }
 
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = Some(newSubsume))
+  }
+
   def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
   }
diff --git a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 149a9ca..910c30d 100644
--- a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -73,6 +73,14 @@ object CarbonToSparkAdapter {
       metadata)(exprId, qualifier)
   }
 
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = newSubsume.split("\n").map(_.trim))
+  }
+
   def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
   }