You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/11/18 08:42:52 UTC

[carbondata] branch master updated: [CARBONDATA-3530] Support Create timeseries MV Datamap with the supported granularity levels

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 93c52ed  [CARBONDATA-3530] Support Create timeseries MV Datamap with the supported granularity levels
93c52ed is described below

commit 93c52ed6df6836db1387e658056470da06a097dc
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Sun Sep 29 22:09:24 2019 +0530

    [CARBONDATA-3530] Support Create timeseries MV Datamap with the supported granularity levels
    
    Use existing mv datamapProvider to create datamap with timeseries
    queries with the supported granularity levels
    
    This PR supports,
    
    1. Creating MV datamap with timeseries queries
    Syntax for creating mv-timeseries datamap:
    CREATE datamap <datamap_name> on <table_name> using 'mv' as Select
    timeseries(time_column,'granularity'), ... from <table_name> group by
    timeseries(time_column,'granularity')
    Queries having timeseries udf of timestamp/date column can be created as
    a mv datamap.
    
    2. Added Validations for providing time stamp column and granularity.
    
    This closes #3437
---
 .../core/datamap/DataMapStoreManager.java          |  34 ++--
 .../core/metadata/schema/table/DataMapSchema.java  |  13 ++
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |   6 +-
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 122 ++++++++++-
 .../TestMVTimeSeriesCreateDataMapCommand.scala     | 224 +++++++++++++++++++++
 docs/datamap/mv-datamap-guide.md                   |  23 +++
 .../command/timeseries/TimeSeriesUtil.scala        |  42 +++-
 7 files changed, 445 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 7ee48cc..5d247f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -94,6 +94,8 @@ public final class DataMapStoreManager {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
 
+  private final Object lockObject = new Object();
+
   private DataMapStoreManager() {
 
   }
@@ -233,19 +235,24 @@ public final class DataMapStoreManager {
    * @param dataMapProvider
    * @param dataMapSchema
    */
-  public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider,
+  public void registerDataMapCatalog(DataMapProvider dataMapProvider,
       DataMapSchema dataMapSchema) throws IOException {
-    initializeDataMapCatalogs(dataMapProvider);
-    String name = dataMapSchema.getProviderName();
-    DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
-    if (dataMapCatalog == null) {
-      dataMapCatalog = dataMapProvider.createDataMapCatalog();
-      if (dataMapCatalog != null) {
-        dataMapCatalogs.put(name.toLowerCase(), dataMapCatalog);
+    synchronized (lockObject) {
+      initializeDataMapCatalogs(dataMapProvider);
+      String name = dataMapSchema.getProviderName().toLowerCase();
+      DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
+      if (dataMapCatalog == null) {
+        dataMapCatalog = dataMapProvider.createDataMapCatalog();
+        // If MVDataMapProvider, then createDataMapCatalog will return summaryDatasetCatalog
+        // instance, which needs to be added to dataMapCatalogs.
+        // For other datamaps, createDataMapCatalog will return null, so no need to register
+        if (dataMapCatalog != null) {
+          dataMapCatalogs.put(name, dataMapCatalog);
+          dataMapCatalog.registerSchema(dataMapSchema);
+        }
+      } else {
         dataMapCatalog.registerSchema(dataMapSchema);
       }
-    } else {
-      dataMapCatalog.registerSchema(dataMapSchema);
     }
   }
 
@@ -257,7 +264,7 @@ public final class DataMapStoreManager {
     if (dataMapCatalogs == null) {
       return;
     }
-    String name = dataMapSchema.getProviderName();
+    String name = dataMapSchema.getProviderName().toLowerCase();
     DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
     if (dataMapCatalog != null) {
       dataMapCatalog.unregisterSchema(dataMapSchema.getDataMapName());
@@ -272,7 +279,7 @@ public final class DataMapStoreManager {
   public synchronized DataMapCatalog getDataMapCatalog(DataMapProvider dataMapProvider,
       String providerName) throws IOException {
     initializeDataMapCatalogs(dataMapProvider);
-    return dataMapCatalogs.get(providerName);
+    return dataMapCatalogs.get(providerName.toLowerCase());
   }
 
   /**
@@ -286,7 +293,8 @@ public final class DataMapStoreManager {
       for (DataMapSchema schema : dataMapSchemas) {
         if (schema.getProviderName()
             .equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) {
-          DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
+          DataMapCatalog dataMapCatalog =
+              dataMapCatalogs.get(schema.getProviderName().toLowerCase());
           if (dataMapCatalog == null) {
             dataMapCatalog = dataMapProvider.createDataMapCatalog();
             if (null == dataMapCatalog) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 2a93f41..69ff837 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -91,6 +91,11 @@ public class DataMapSchema implements Serializable, Writable {
    */
   private Map<Integer, String> columnsOrderMap;
 
+  /**
+   * timeseries query
+   */
+  private boolean isTimeSeries;
+
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;
@@ -279,4 +284,12 @@ public class DataMapSchema implements Serializable, Writable {
   public void setColumnsOrderMap(Map<Integer, String> columnsOrderMap) {
     this.columnsOrderMap = columnsOrderMap;
   }
+
+  public boolean isTimeSeries() {
+    return isTimeSeries;
+  }
+
+  public void setTimeSeries(boolean timeSeries) {
+    isTimeSeries = timeSeries;
+  }
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index b2d9d3b..15508fd 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -58,7 +58,11 @@ class MVDataMapProvider(
       throw new MalformedDataMapCommandException(
         "select statement is mandatory")
     }
-    MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true)
+    MVHelper.createMVDataMap(sparkSession,
+      dataMapSchema,
+      ctasSqlStatement,
+      true,
+      mainTable)
     try {
       DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
       if (dataMapSchema.isLazy) {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 5a88c43..7ff4c8a 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -25,22 +25,25 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction, TimeSeriesUtil}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
+import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType}
 import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.plans.modular._
 import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
@@ -54,7 +57,8 @@ object MVHelper {
   def createMVDataMap(sparkSession: SparkSession,
       dataMapSchema: DataMapSchema,
       queryString: String,
-      ifNotExistsSet: Boolean = false): Unit = {
+      ifNotExistsSet: Boolean = false,
+      mainTable: CarbonTable): Unit = {
     val dmProperties = dataMapSchema.getProperties.asScala
     if (dmProperties.contains("streaming") && dmProperties("streaming").equalsIgnoreCase("true")) {
       throw new MalformedCarbonCommandException(
@@ -79,6 +83,9 @@ object MVHelper {
         s"Non-Carbon table does not support creating MV datamap")
     }
     val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
+    val (timeSeriesColumn, granularity): (String, String) = validateMVTimeSeriesQuery(
+      logicalPlan,
+      dataMapSchema)
     val fullRebuild = isFullReload(logicalPlan)
     var counter = 0
     // the ctas query can have duplicate columns, so we should take distinct and create fields,
@@ -138,6 +145,17 @@ object MVHelper {
       }
       parentTablesList.add(mainCarbonTable.get)
     }
+
+    // Check if load is in progress in any of the parent table mapped to the datamap
+    parentTablesList.asScala.foreach {
+      parentTable =>
+        if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
+          throw new UnsupportedOperationException(
+            "Cannot create mv datamap table when insert is in progress on parent table: " +
+            parentTable.getTableName)
+        }
+    }
+
     tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
     tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
 
@@ -152,6 +170,34 @@ object MVHelper {
           fields,
           fieldRelationMap,
           tableProperties)
+      if (granularity != null) {
+        if (null != mainTable) {
+          if (!mainTable.getTableName.equalsIgnoreCase(parentTablesList.get(0).getTableName)) {
+            throw new MalformedCarbonCommandException(
+              "Parent table name is different in Create and Select Statement")
+          }
+        }
+        val timeSeriesDataType = parentTablesList
+          .get(0)
+          .getTableInfo
+          .getFactTable
+          .getListOfColumns
+          .asScala
+          .filter(columnSchema => columnSchema.getColumnName
+            .equalsIgnoreCase(timeSeriesColumn))
+          .head
+          .getDataType
+        if (timeSeriesDataType.equals(DataTypes.DATE) ||
+            timeSeriesDataType.equals(DataTypes.TIMESTAMP)) {
+          // if data type is of Date type, then check if given granularity is valid for date type
+          if (timeSeriesDataType.equals(DataTypes.DATE)) {
+            TimeSeriesUtil.validateTimeSeriesGranularityForDate(granularity)
+          }
+        } else {
+          throw new MalformedCarbonCommandException(
+            "TimeSeries Column must be of TimeStamp or Date type")
+        }
+      }
     }
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
     val usePartitioning = dmProperties.getOrElse("partitioning", "true").toBoolean
@@ -226,7 +272,12 @@ object MVHelper {
     }
     dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
     dataMapSchema.setColumnsOrderMap(columnOrderMap)
-    dataMapSchema.setCtasQuery(updatedQueryWithDb)
+    if (null != granularity && null != timeSeriesColumn) {
+      dataMapSchema.setCtasQuery(queryString)
+      dataMapSchema.setTimeSeries(true)
+    } else {
+      dataMapSchema.setCtasQuery(updatedQueryWithDb)
+    }
     dataMapSchema
       .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
         tableIdentifier.table,
@@ -840,5 +891,68 @@ object MVHelper {
       case _ => outputList
     }.get
   }
+
+  /**
+   * Validate mv timeseries query for timeseries column and granularity.
+   * TimeSeries udf function will contain data type as TimeStamp/cast as TimeStamp
+   *
+   * @param logicalPlan   to be validated
+   * @param dataMapSchema to check if it is lazy/non-lazy datamap
+   * @return
+   */
+  private def validateMVTimeSeriesQuery(logicalPlan: LogicalPlan,
+      dataMapSchema: DataMapSchema): (String, String) = {
+    var timeSeriesColumn: String = null
+    var granularity: String = null
+    logicalPlan.transformExpressions {
+      case alias@Alias(udf: ScalaUDF, _) =>
+        if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+          if (null == timeSeriesColumn && null == granularity) {
+            udf.children.collect {
+              case attr: AttributeReference =>
+                timeSeriesColumn = attr.name
+              case l: Literal =>
+                granularity = l.value.toString
+              case c: Cast =>
+                c.child match {
+                  case attribute: AttributeReference =>
+                    if (attribute.dataType.isInstanceOf[DateType]) {
+                      timeSeriesColumn = attribute.name
+                    }
+                  case _ =>
+                }
+            }
+          } else {
+            udf.children.collect {
+              case attr: AttributeReference =>
+                if (!attr.name.equalsIgnoreCase(timeSeriesColumn)) {
+                  throw new MalformedCarbonCommandException(
+                    "Multiple timeseries udf functions are defined in Select statement with " +
+                    "different timestamp columns")
+                }
+              case l: Literal =>
+                if (!granularity.equalsIgnoreCase(l.value.toString)) {
+                  throw new MalformedCarbonCommandException(
+                    "Multiple timeseries udf functions are defined in Select statement with " +
+                    "different granularities")
+                }
+            }
+          }
+        }
+        alias
+    }
+    // timeseries column and granularity is not null, then validate
+    if (null != timeSeriesColumn && null != granularity) {
+      if (dataMapSchema.isLazy) {
+        throw new MalformedCarbonCommandException(
+          "MV TimeSeries queries does not support Lazy Rebuild")
+      }
+      TimeSeriesUtil.validateTimeSeriesGranularity(granularity)
+    } else if (null == timeSeriesColumn && null != granularity) {
+      throw new MalformedCarbonCommandException(
+        "MV TimeSeries is only supported on Timestamp/Date column")
+    }
+    (timeSeriesColumn, granularity)
+  }
 }
 
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
new file mode 100644
index 0000000..4feab2f
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.timeseries
+
+import java.util.concurrent.{Callable, Executors, TimeUnit}
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.mv.rewrite.TestUtil
+
+class TestMVTimeSeriesCreateDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+  private val timestampFormat = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    drop()
+    sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable  OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+  }
+
+  def drop(): Unit = {
+    sql("drop table if exists products")
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test mv_timeseries create datamap") {
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv'" +
+      " as select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+    val result = sql("show datamap on table maintable").collectAsList()
+    assert(result.get(0).get(0).toString.equalsIgnoreCase("datamap1"))
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    val df = sql("select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "datamap1"))
+    sql("drop datamap if exists datamap1")
+  }
+
+  test("test mv_timeseries create lazy datamap") {
+    sql("drop datamap if exists datamap1")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' with deferred rebuild as " +
+        "select timeseries(projectjoindate,'second') from maintable group by timeseries(projectjoindate,'second')")
+    }.getMessage.contains("MV TimeSeries queries does not support Lazy Rebuild")
+  }
+
+  test("test mv_timeseries create datamap with multiple granularity") {
+    sql("drop datamap if exists datamap1")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable using 'mv'  as " +
+        "select timeseries(projectjoindate,'second'), timeseries(projectjoindate,'hour') from maintable")
+    }.getMessage.contains("Multiple timeseries udf functions are defined in Select statement with different granularities")
+  }
+
+  test("test mv_timeseries create datamap with date type as timeseries_column") {
+    sql("drop table IF EXISTS maintable_new")
+    sql("CREATE TABLE maintable_new (projectcode int, projectjoindate date, projectenddate Timestamp,attendance int) " +
+        "STORED BY 'org.apache.carbondata.format'")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 on table maintable_new using 'mv' as " +
+      "select timeseries(projectjoindate,'day') from maintable_new")
+    val result = sql("show datamap on table maintable_new").collectAsList()
+    assert(result.get(0).get(0).toString.equalsIgnoreCase("datamap1"))
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    sql("drop table IF EXISTS maintable_new")
+  }
+
+  test("test mv_timeseries create datamap with date type as timeseries_column with incorrect granularity") {
+    sql("drop table IF EXISTS maintable_new")
+    sql("CREATE TABLE maintable_new (projectcode int, projectjoindate date, projectenddate Timestamp,attendance int) " +
+        "STORED BY 'org.apache.carbondata.format'")
+    sql("drop datamap if exists datamap1")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable_new using 'mv' as " +
+        "select timeseries(projectjoindate,'second') from maintable_new")
+    }.getMessage
+      .contains("Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type")
+    sql("drop table IF EXISTS maintable_new")
+  }
+
+  test("test mv_timeseries create datamap - Parent table name is different in Create and Select Statement") {
+    sql("drop table if exists main_table")
+    sql("CREATE TABLE main_table (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
+        "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql("drop datamap if exists datamap1")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table main_table using 'mv' as " +
+        "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+    }.getMessage.contains("Parent table name is different in Create and Select Statement")
+    sql("drop table if exists main_table")
+  }
+
+  test("test mv_timeseries for same event_column with different granularities") {
+    def dropDataMaps = {
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql("drop datamap if exists datamap3")
+      sql("drop datamap if exists datamap4")
+      sql("drop datamap if exists datamap5")
+    }
+    dropDataMaps
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+    sql(
+      "create datamap datamap2 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')")
+    sql(
+      "create datamap datamap3 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'minute'), sum(projectcode) from maintable group by timeseries(projectjoindate,'minute')")
+    sql(
+      "create datamap datamap4 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'day'), sum(projectcode) from maintable group by timeseries(projectjoindate,'day')")
+    sql(
+      "create datamap datamap5 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')")
+    dropDataMaps
+  }
+
+  test("test mv_timeseries create datamap with more event_columns") {
+    sql("drop datamap if exists datamap1")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'), timeseries(projectenddate,'hour') from maintable")
+    }.getMessage.contains(
+        "Multiple timeseries udf functions are defined in Select statement with different timestamp columns")
+  }
+
+  test("test mv_timeseries create datamap with same granularity and different ctas") {
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')")
+    sql("drop datamap if exists datamap2")
+    sql(
+      "create datamap datamap2 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable where projectjoindate='29-06-2008 00:00:00.0' " +
+      "group by timeseries(projectjoindate,'second')")
+    sql("drop datamap if exists datamap1")
+    sql("drop datamap if exists datamap2")
+  }
+
+  test("insert and create datamap in progress") {
+    sql("drop datamap if exists datamap1")
+    val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable  " +
+                s"OPTIONS('DELIMITER'= ',')"
+    val executorService = Executors.newFixedThreadPool(4)
+    executorService.submit(new QueryTask(query))
+    intercept[UnsupportedOperationException] {
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')")
+    }.getMessage
+      .contains("Cannot create mv datamap table when insert is in progress on parent table: maintable")
+    executorService.shutdown()
+    executorService.awaitTermination(2, TimeUnit.HOURS)
+    sql("drop datamap if exists datamap1")
+  }
+
+  test("test create datamap with incorrect timeseries_column and granularity") {
+    sql("drop datamap if exists datamap2")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'time'), sum(projectcode) from maintable group by timeseries(projectjoindate,'time')")
+    }.getMessage.contains("Granularity time is invalid")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(empname,'second'), sum(projectcode) from maintable group by timeseries(empname,'second')")
+    }.getMessage.contains("MV Timeseries is only supported on Timestamp/Date column")
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        sql(query).collect()
+      } catch {
+        case exception: Exception => LOGGER.error(exception.getMessage)
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+  override def afterAll(): Unit = {
+    drop()
+    if (null != timestampFormat) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
+    }
+  }
+}
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index 4849a89..dfa8618 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -23,6 +23,7 @@
 * [Querying Data](#querying-data)
 * [Compaction](#compacting-mv-datamap)
 * [Data Management](#data-management-with-mv-tables)
+* [MV TimeSeries Support](#mv-timeseries-support)
 
 ## Quick example
 
@@ -208,3 +209,25 @@ release, user can do as following:
 2. Carry out the data management operation on main table
 3. Create the mv datamap table again by `CREATE DATAMAP` command
 Basically, user can manually trigger the operation by re-building the datamap.
+
+## MV TimeSeries Support
+MV non-lazy datamap support's TimeSeries queries. Supported granularities strings are: year, month, day, week,
+hour,thirty_minute, fifteen_minute, minute and second.
+
+ User can create MV datamap with timeseries queries like the below example:
+
+  ```
+  CREATE DATAMAP agg_sales
+  ON TABLE sales
+  USING "MV"
+  AS
+    SELECT timeseries(order_time,'second'),avg(price)
+    FROM sales
+    GROUP BY timeseries(order_time,'second')
+  ```
+Supported columns that can be provided in timeseries udf should be of TimeStamp/Date type.
+Timeseries queries with Date type support's only year, month, day and week granularities.
+
+ **NOTE**:
+ 1. Multiple timeseries udf functions cannot be defined in Select statement with different timestamp 
+ columns or different granularities.
\ No newline at end of file
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 97eccc6..5dd3a30 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -17,15 +17,16 @@
 package org.apache.spark.sql.execution.command.timeseries
 
 import scala.collection.mutable
+import scala.util.control.Breaks._
 
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.datamap.Granularity
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.preagg.TimeSeriesUDF
 
 /**
  * Utility class for time series to keep
@@ -161,5 +162,44 @@ object TimeSeriesUtil {
                      obj._2.aggregateFunction.isEmpty)
     isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
   }
+
+  def validateTimeSeriesGranularityForDate(
+      timeSeriesFunction: String): Unit = {
+    for (granularity <- Granularity.values()) {
+      if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
+        .substring(0, granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+        if (!(granularity.getName.equalsIgnoreCase(Granularity.DAY.getName) ||
+              granularity.getName.equalsIgnoreCase(Granularity.MONTH.getName) ||
+              granularity.getName.equalsIgnoreCase(Granularity.YEAR.getName))) {
+          throw new MalformedCarbonCommandException(
+            "Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type")
+        }
+      }
+    }
+  }
+
+  /**
+   * validate TimeSeries Granularity
+   *
+   * @param timeSeriesFunction user defined granularity
+   */
+  def validateTimeSeriesGranularity(
+      timeSeriesFunction: String): Unit = {
+    var found = false
+    breakable {
+      for (granularity <- Granularity.values()) {
+        if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
+          .substring(0, granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+          found = true
+          break
+        }
+      }
+    }
+    if (!found) {
+      throw new MalformedCarbonCommandException(
+        "Granularity " + timeSeriesFunction + " is invalid")
+    }
+  }
+
 }