You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 22:39:42 UTC

[1/2] carbondata git commit: [CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table

Repository: carbondata
Updated Branches:
  refs/heads/master 0da0a4f61 -> e2a79eebb


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 8900847..c198613 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -169,7 +169,6 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     catalog.ParquetConversions ::
     catalog.OrcConversions ::
     CarbonPreInsertionCasts(sparkSession) ::
-    CarbonPreAggregateDataLoadingRules ::
     CarbonIUDAnalysisRule(sparkSession) ::
     AnalyzeCreateTable(sparkSession) ::
     PreprocessTableInsertion(conf) ::
@@ -213,7 +212,8 @@ class CarbonAnalyzer(catalog: SessionCatalog,
     sparkSession: SparkSession,
     analyzer: Analyzer) extends Analyzer(catalog, conf) {
   override def execute(plan: LogicalPlan): LogicalPlan = {
-    val logicalPlan = analyzer.execute(plan)
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan)
     CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 2ba6d09..8acddfa 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -148,7 +148,8 @@ class CarbonAnalyzer(catalog: SessionCatalog,
     sparkSession: SparkSession,
     analyzer: Analyzer) extends Analyzer(catalog, conf) {
   override def execute(plan: LogicalPlan): LogicalPlan = {
-    val logicalPlan = analyzer.execute(plan)
+    var logicalPlan = analyzer.execute(plan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan)
     CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
   }
 }
@@ -215,7 +216,6 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
         new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
         new CarbonIUDAnalysisRule(sparkSession) +:
-        CarbonPreAggregateDataLoadingRules +:
         new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
 
       override val extendedCheckRules: Seq[LogicalPlan => Unit] =


[2/2] carbondata git commit: [CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table

Posted by ra...@apache.org.
[CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table

This closes #1626


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e2a79eeb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e2a79eeb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e2a79eeb

Branch: refs/heads/master
Commit: e2a79eebbcbe641f657e84ccf86a9a7e84e8e735
Parents: 0da0a4f
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Dec 5 20:56:58 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Dec 8 04:09:25 2017 +0530

----------------------------------------------------------------------
 .../schema/table/AggregationDataMapSchema.java  | 129 +++++++++-
 .../core/preagg/AggregateTableSelector.java     |  31 ++-
 .../carbondata/core/preagg/QueryColumn.java     |  20 +-
 .../core/preagg/TimeSeriesFunction.java         |  40 ----
 .../core/preagg/TimeSeriesFunctionEnum.java     |  53 +++++
 .../carbondata/core/preagg/TimeSeriesUDF.java   |   5 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  32 +++
 .../src/test/resources/timeseriestest.csv       |   7 +
 .../TestPreAggregateTableSelection.scala        |   1 -
 .../timeseries/TestTimeSeriesCreateTable.scala  |  18 +-
 .../timeseries/TestTimeseriesDataLoad.scala     |  79 +++++++
 .../TestTimeseriesTableSelection.scala          | 131 +++++++++++
 .../carbondata/spark/util/CarbonSparkUtil.scala |   7 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   6 +-
 .../apache/spark/sql/CarbonExpressions.scala    |  16 +-
 .../preaaggregate/PreAggregateListeners.scala   |  65 +++--
 .../preaaggregate/PreAggregateUtil.scala        | 105 ++++++++-
 .../command/timeseries/TimeSeriesFunction.scala |  33 +++
 .../command/timeseries/TimeseriesUtil.scala     |  15 --
 .../spark/sql/hive/CarbonFileMetastore.scala    |   5 +-
 .../sql/hive/CarbonPreAggregateRules.scala      | 235 ++++++++++++++-----
 .../src/main/spark2.1/CarbonSessionState.scala  |   4 +-
 .../src/main/spark2.2/CarbonSessionState.scala  |   4 +-
 23 files changed, 885 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 9bfb22c..8f6a2d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum;
 
 /**
  * data map schema class for pre aggregation
@@ -50,6 +51,17 @@ public class AggregationDataMapSchema extends DataMapSchema {
    */
   private Map<String, Set<String>> parentColumnToAggregationsMapping;
 
+  /**
+   * whether its a timeseries data map
+   */
+  private boolean isTimeseriesDataMap;
+
+  /**
+   * below ordinal will be used during sorting the data map
+   * to support rollup for loading
+   */
+  private int ordinal = Integer.MAX_VALUE;
+
   public AggregationDataMapSchema(String dataMapName, String className) {
     super(dataMapName, className);
   }
@@ -63,6 +75,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
   }
 
   /**
+   * Below method will be used to get the columns on which aggregate function
+   * and time series function is not applied
+   * @param columnName
+   *                parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getNonAggNonTimeseriesChildColBasedByParent(String columnName) {
+    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
+            .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
    * Below method will be used to get the columns on which aggregate function is not applied
    * @param columnName
    *                parent column name
@@ -74,7 +108,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
       Iterator<ColumnSchema> iterator = columnSchemas.iterator();
       while (iterator.hasNext()) {
         ColumnSchema next = iterator.next();
-        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty())) {
+          return next;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the columns on which aggregate function is not applied
+   *
+   * @param columnName parent column name
+   * @return child column schema
+   */
+  public ColumnSchema getTimeseriesChildColBasedByParent(String columnName,
+      String timeseriesFunction) {
+    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+    if (null != columnSchemas) {
+      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+      while (iterator.hasNext()) {
+        ColumnSchema next = iterator.next();
+        if (timeseriesFunction.equals(next.getTimeSeriesFunction())) {
           return next;
         }
       }
@@ -126,6 +181,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
   }
 
   /**
+   * Below method will be used to get the column schema based on parent column name
+   * @param columName
+   *                parent column name
+   * @param timeseriesFunction
+   *                timeseries function applied on column
+   * @return child column schema
+   */
+  public ColumnSchema getTimeseriesChildColByParent(String columName, String timeseriesFunction) {
+    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+    for (ColumnSchema columnSchema : listOfColumns) {
+      List<ParentColumnTableRelation> parentColumnTableRelations =
+          columnSchema.getParentColumnTableRelations();
+      if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1
+          && parentColumnTableRelations.get(0).getColumnName().equals(columName)
+          && timeseriesFunction.equalsIgnoreCase(columnSchema.getTimeSeriesFunction())) {
+        return columnSchema;
+      }
+    }
+    return null;
+  }
+
+  /**
    * Below method is to check if parent column with matching aggregate function
    * @param parentColumnName
    *                    parent column name
@@ -175,6 +252,15 @@ public class AggregationDataMapSchema extends DataMapSchema {
   private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
     parentToNonAggChildMapping = new HashMap<>();
     for (ColumnSchema column : listOfColumns) {
+      if (!isTimeseriesDataMap) {
+        isTimeseriesDataMap =
+            null != column.getTimeSeriesFunction() && !column.getTimeSeriesFunction().isEmpty();
+        if (isTimeseriesDataMap) {
+          this.ordinal =
+              TimeSeriesFunctionEnum.valueOf(column.getTimeSeriesFunction().toUpperCase())
+                  .getOrdinal();
+        }
+      }
       if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
         fillMappingDetails(column, parentToNonAggChildMapping);
       }
@@ -210,4 +296,45 @@ public class AggregationDataMapSchema extends DataMapSchema {
     }
   }
 
+  public boolean isTimeseriesDataMap() {
+    return isTimeseriesDataMap;
+  }
+
+  /**
+   * Below method is to support rollup during loading the data in pre aggregate table
+   * In case of timeseries year level table data loading can be done using month level table or any
+   * time series level below year level for example day,hour minute, second.
+   * @TODO need to handle for pre aggregate table without timeseries
+   *
+   * @param aggregationDataMapSchema
+   * @return whether aggregation data map can be selected or not
+   */
+  public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) {
+    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+    for (ColumnSchema column : listOfColumns) {
+      List<ParentColumnTableRelation> parentColumnTableRelations =
+          column.getParentColumnTableRelations();
+      //@TODO handle scenario when aggregate datamap columns is derive from multiple column
+      // which is not supported currently
+      if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+        if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+          if (null == aggregationDataMapSchema
+              .getAggChildColByParent(parentColumnTableRelations.get(0).getColumnName(),
+                  column.getAggFunction())) {
+            return false;
+          }
+        } else {
+          if (null == aggregationDataMapSchema.getNonAggChildColBasedByParent(
+              parentColumnTableRelations.get(0).getColumnName())) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  public int getOrdinal() {
+    return ordinal;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
index 8b87a1a..5347567 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -70,8 +70,8 @@ public class AggregateTableSelector {
         AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
         isMatch = true;
         for (QueryColumn queryColumn : projectionColumn) {
-          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
-              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          ColumnSchema columnSchemaByParentName =
+              getColumnSchema(queryColumn, aggregationDataMapSchema);
           if (null == columnSchemaByParentName) {
             isMatch = false;
           }
@@ -95,8 +95,8 @@ public class AggregateTableSelector {
         isMatch = true;
         for (QueryColumn queryColumn : filterColumns) {
           AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
-          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
-              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+          ColumnSchema columnSchemaByParentName =
+              getColumnSchema(queryColumn, aggregationDataMapSchema);
           if (null == columnSchemaByParentName) {
             isMatch = false;
           }
@@ -132,4 +132,27 @@ public class AggregateTableSelector {
     }
     return selectedDataMapSchema;
   }
+
+  /**
+   * Below method will be used to get column schema for projection and
+   * filter query column
+   *
+   * @param queryColumn              query column
+   * @param aggregationDataMapSchema selected data map schema
+   * @return column schema
+   */
+  private ColumnSchema getColumnSchema(QueryColumn queryColumn,
+      AggregationDataMapSchema aggregationDataMapSchema) {
+    ColumnSchema columnSchemaByParentName = null;
+    if (!queryColumn.getTimeseriesFunction().isEmpty()) {
+      columnSchemaByParentName = aggregationDataMapSchema
+          .getTimeseriesChildColBasedByParent(queryColumn.getColumnSchema().getColumnName(),
+              queryColumn.getTimeseriesFunction());
+    } else {
+      columnSchemaByParentName = aggregationDataMapSchema
+          .getNonAggNonTimeseriesChildColBasedByParent(
+              queryColumn.getColumnSchema().getColumnName());
+    }
+    return columnSchemaByParentName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
index c889716..c91a703 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -44,12 +44,18 @@ public class QueryColumn {
    */
   private boolean isFilterColumn;
 
+  /**
+   * timeseries udf applied on column
+   */
+  private String timeseriesFunction;
+
   public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
-      boolean isFilterColumn) {
+      boolean isFilterColumn, String timeseriesFunction) {
     this.columnSchema = columnSchema;
     this.changedDataType = changedDataType;
     this.aggFunction = aggFunction;
     this.isFilterColumn = isFilterColumn;
+    this.timeseriesFunction = timeseriesFunction;
   }
 
   public ColumnSchema getColumnSchema() {
@@ -68,6 +74,10 @@ public class QueryColumn {
     return isFilterColumn;
   }
 
+  public String getTimeseriesFunction() {
+    return timeseriesFunction;
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) {
       return true;
@@ -82,12 +92,18 @@ public class QueryColumn {
     if (!columnSchema.equals(that.columnSchema)) {
       return false;
     }
-    return aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null;
+    if (!(aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null)) {
+      return false;
+    }
+    return timeseriesFunction != null ?
+        timeseriesFunction.equals(that.timeseriesFunction) :
+        that.timeseriesFunction == null;
   }
 
   @Override public int hashCode() {
     int result = columnSchema.hashCode();
     result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0);
+    result = 31 * result + (timeseriesFunction != null ? timeseriesFunction.hashCode() : 0);
     result = 31 * result + (isFilterColumn ? 1 : 0);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
deleted file mode 100644
index 02ff753..0000000
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.preagg;
-
-/**
- * enum for timeseries function
- */
-public enum TimeSeriesFunction {
-  SECOND("second"),
-  MINUTE("minute"),
-  HOUR("hour"),
-  DAY("day"),
-  MONTH("month"),
-  YEAR("year");
-
-  private String name;
-
-  TimeSeriesFunction(String name) {
-    this.name = name;
-  }
-
-  public String getName() {
-    return name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
new file mode 100644
index 0000000..5d0d2af
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+/**
+ * enum for timeseries function
+ */
+public enum TimeSeriesFunctionEnum {
+  SECOND("second", 0),
+  MINUTE("minute", 1),
+  HOUR("hour", 2),
+  DAY("day", 3),
+  MONTH("month", 4),
+  YEAR("year", 5);
+
+  /**
+   * name of the function
+   */
+  private String name;
+
+  /**
+   * ordinal for function
+   */
+  private int ordinal;
+
+  TimeSeriesFunctionEnum(String name, int ordinal) {
+    this.name = name;
+    this.ordinal = ordinal;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getOrdinal() {
+    return ordinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index 50cb052..3aa4190 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -66,8 +66,9 @@ public class TimeSeriesUDF {
     Calendar calendar = calanderThreadLocal.get();
     calendar.clear();
     calendar.setTimeInMillis(data.getTime());
-    TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function);
-    switch (timeSeriesFunction) {
+    TimeSeriesFunctionEnum timeSeriesFunctionEnum =
+        TimeSeriesFunctionEnum.valueOf(function.toUpperCase());
+    switch (timeSeriesFunctionEnum) {
       case SECOND:
         calendar.set(Calendar.MILLISECOND, 0);
         break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ab85684..148098d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -67,7 +67,9 @@ import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -2297,5 +2299,35 @@ public final class CarbonUtil {
     return dataAndIndexSize;
   }
 
+  /**
+   * Utility function to check whether table has timseries datamap or not
+   * @param carbonTable
+   * @return timeseries data map present
+   */
+  public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) {
+    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+    for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+      if (dataMapSchema instanceof AggregationDataMapSchema) {
+        return ((AggregationDataMapSchema) dataMapSchema).isTimeseriesDataMap();
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Utility function to check whether table has aggregation datamap or not
+   * @param carbonTable
+   * @return timeseries data map present
+   */
+  public static boolean hasAggregationDataMap(CarbonTable carbonTable) {
+    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+    for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+      if (dataMapSchema instanceof AggregationDataMapSchema) {
+        return true;
+      }
+    }
+    return false;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/resources/timeseriestest.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/timeseriestest.csv b/integration/spark-common-test/src/test/resources/timeseriestest.csv
new file mode 100644
index 0000000..1674ac9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/timeseriestest.csv
@@ -0,0 +1,7 @@
+mytime,name,age
+2016-2-23 01:01:30,vishal,10
+2016-2-23 01:01:40,kunal,20
+2016-2-23 01:01:50,shahid,30
+2016-2-23 01:02:30,kk,40
+2016-2-23 01:02:40,rahul,50
+2016-2-23 01:02:50,ravi,50
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index dc117a5..d84ec3b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index b60e487..5cbcb26 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -1,7 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.carbondata.integration.spark.testsuite.timeseries
 
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, Ignore}
 
 class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
new file mode 100644
index 0000000..217edea
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+@Ignore
+class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+  }
+
+  test("test Year level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_year"),
+      Seq(Row(Timestamp.valueOf("2016-01-01 00:00:00.0"),200)))
+  }
+
+  test("test month level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_month"),
+      Seq(Row(Timestamp.valueOf("2016-02-01 00:00:00.0"),200)))
+  }
+
+  test("test day level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_day"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 00:00:00.0"),200)))
+  }
+
+  test("test hour level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_hour"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:00:00.0"),200)))
+  }
+
+  test("test minute level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_minute"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:00.0"),60),
+        Row(Timestamp.valueOf("2016-02-23 01:02:00.0"),140)))
+  }
+
+  test("test second level timeseries data validation") {
+    checkAnswer( sql("select * from maintable_agg0_second"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10),
+        Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20),
+        Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30),
+        Row(Timestamp.valueOf("2016-02-23 01:02:30.0"),40),
+        Row(Timestamp.valueOf("2016-02-23 01:02:40.0"),50),
+        Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
new file mode 100644
index 0000000..0990f87
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+
+class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+  }
+  test("test PreAggregate table selection 1") {
+    val df = sql("select dataTime from mainTable group by dataTime")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable")
+  }
+
+  test("test PreAggregate table selection 2") {
+    val df = sql("select timeseries(dataTime,'hour') from mainTable group by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
+  }
+
+  test("test PreAggregate table selection 3") {
+    val df = sql("select timeseries(dataTime,'milli') from mainTable group by timeseries(dataTime,'milli')")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable")
+  }
+
+  test("test PreAggregate table selection 4") {
+    val df = sql("select timeseries(dataTime,'year') from mainTable group by timeseries(dataTime,'year')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_year")
+  }
+
+  test("test PreAggregate table selection 5") {
+    val df = sql("select timeseries(dataTime,'day') from mainTable group by timeseries(dataTime,'day')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_day")
+  }
+
+  test("test PreAggregate table selection 6") {
+    val df = sql("select timeseries(dataTime,'month') from mainTable group by timeseries(dataTime,'month')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_month")
+  }
+
+  test("test PreAggregate table selection 7") {
+    val df = sql("select timeseries(dataTime,'minute') from mainTable group by timeseries(dataTime,'minute')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_minute")
+  }
+
+  test("test PreAggregate table selection 8") {
+    val df = sql("select timeseries(dataTime,'second') from mainTable group by timeseries(dataTime,'second')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_second")
+  }
+
+  test("test PreAggregate table selection 9") {
+    val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+  }
+
+  test("test PreAggregate table selection 10") {
+    val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+  }
+
+  test("test PreAggregate table selection 11") {
+    val df = sql("select timeseries(dataTime,'hour'),sum(age) from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+  }
+
+  test("test PreAggregate table selection 12") {
+    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+  }
+
+  test("test PreAggregate table selection 13") {
+    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' and name='vishal' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    preAggTableValidator(df.queryExecution.analyzed,"maintable")
+  }
+
+  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+    var isValidPlan = false
+    plan.transform {
+      // first check if any preaTable1 scala function is applied it is present is in plan
+      // then call is from create preaTable1regate table class so no need to transform the query plan
+      case ca:CarbonRelation =>
+        if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        ca
+      case logicalRelation:LogicalRelation =>
+        if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        logicalRelation
+    }
+    if(!isValidPlan) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 7cc3d11..5f78397 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.util.CarbonUtil
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
@@ -41,7 +42,11 @@ object CarbonSparkUtil {
             f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
                 !f.getDataType.isComplexType)
       }
-    CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
+    CarbonMetaData(dimensionsAttr,
+      measureAttr,
+      carbonTable,
+      DictionaryMap(dictionary.toMap),
+      CarbonUtil.hasAggregationDataMap(carbonTable))
   }
 
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d68bc41..6317177 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql
 
-import java.sql.Timestamp
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction}
 import org.apache.spark.sql.hive._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -65,8 +64,7 @@ class CarbonEnv {
     sparkSession.udf.register("preAggLoad", () => "")
 
     // added for handling timeseries function like hour, minute, day , month , year
-    sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) =>
-      TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction))
+    sparkSession.udf.register("timeseries", new TimeSeriesFunction)
     synchronized {
       if (!initialized) {
         // update carbon session parameters , preserve thread parameters

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
index 8e157fd..c1f9e8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, ScalaUDF}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.command.DescribeTableCommand
 import org.apache.spark.sql.types.DataType
@@ -84,4 +84,18 @@ object CarbonExpressions {
       }
     }
   }
+
+  /**
+   * unapply method of Scala UDF
+   */
+  object CarbonScalaUDF {
+    def unapply(expression: Expression): Option[(ScalaUDF)] = {
+      expression match {
+        case a: ScalaUDF =>
+          Some(a)
+        case _ =>
+          None
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 4315e05..747e447 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -18,19 +18,15 @@
 package org.apache.spark.sql.execution.command.preaaggregate
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
-import org.apache.spark.sql.execution.command.AlterTableModel
-import org.apache.spark.sql.CarbonSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand}
+import org.apache.spark.sql.execution.command.AlterTableModel
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 object LoadPostAggregateListener extends OperationEventListener {
   /**
@@ -43,20 +39,47 @@ object LoadPostAggregateListener extends OperationEventListener {
     val sparkSession = loadEvent.sparkSession
     val carbonLoadModel = loadEvent.carbonLoadModel
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    if (table.hasDataMapSchema) {
-      for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
+    if (CarbonUtil.hasAggregationDataMap(table)) {
+      // getting all the aggergate datamap schema
+      val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
+        .filter(_.isInstanceOf[AggregationDataMapSchema])
+        .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+      // sorting the datamap for timeseries rollup
+      val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
+      val parentTableName = table.getTableName
+      val databasename = table.getDatabaseName
+      val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+      for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
         val childTableName = dataMapSchema.getRelationIdentifier.getTableName
         val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+        val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
+          dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
+        } else {
+          // for timeseries rollup policy
+          val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
+            dataMapSchema)
+          // if non of the rollup data map is selected hit the maintable and prepare query
+          if (tableSelectedForRollup.isEmpty) {
+            PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
+              parentTableName,
+              databasename)
+          } else {
+            // otherwise hit the select rollup datamap schema
+            PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
+              tableSelectedForRollup.get,
+              databasename)
+          }
+        }
         PreAggregateUtil.startDataLoadForDataMap(
             table,
             TableIdentifier(childTableName, Some(childDatabaseName)),
-            dataMapSchema.getProperties.get("CHILD_SELECT QUERY"),
+            childSelectQuery,
             carbonLoadModel.getSegmentId,
             validateSegments = false,
             sparkSession)
+        }
       }
     }
-  }
 }
 
 /**
@@ -74,7 +97,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
     val carbonTable = compactionEvent.carbonTable
     val compactionType = compactionEvent.carbonMergerMapping.campactionType
     val sparkSession = compactionEvent.sparkSession
-    if (carbonTable.hasDataMapSchema) {
+    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
         val childRelationIdentifier = dataMapSchema.getRelationIdentifier
         val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
@@ -120,7 +143,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
     val carbonTable = dataTypeChangePreListener.carbonTable
     val alterTableDataTypeChangeModel = dataTypeChangePreListener.alterTableDataTypeChangeModel
     val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
-    if (carbonTable.hasDataMapSchema) {
+    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
       dataMapSchemas.asScala.foreach { dataMapSchema =>
         val childColumns = dataMapSchema.getChildSchema.getListOfColumns
@@ -170,7 +193,7 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
     val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
     val carbonTable = deleteSegmentByDatePreEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasDataMapSchema) {
+      if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table. " +
           "Drop pre-aggregation table to continue")
@@ -194,7 +217,7 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasDataMapSchema) {
+      if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
         throw new UnsupportedOperationException(
           "Delete segment operation is not supported on tables which have a pre-aggregate table")
       }
@@ -219,7 +242,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
     val carbonTable = dataTypeChangePreListener.carbonTable
     val alterTableDropColumnModel = dataTypeChangePreListener.alterTableDropColumnModel
     val columnsToBeDropped = alterTableDropColumnModel.columns
-    if (carbonTable.hasDataMapSchema) {
+    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
       dataMapSchemas.asScala.foreach { dataMapSchema =>
         val parentColumnNames = dataMapSchema.getChildSchema.getListOfColumns.asScala
@@ -257,7 +280,7 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
       throw new UnsupportedOperationException(
         "Rename operation for pre-aggregate table is not supported.")
     }
-    if (carbonTable.hasDataMapSchema) {
+    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       throw new UnsupportedOperationException(
         "Rename operation is not supported for table with pre-aggregate tables")
     }
@@ -275,7 +298,7 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasDataMapSchema) {
+      if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
         throw new UnsupportedOperationException(
           "Update operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")
@@ -299,7 +322,7 @@ object DeletePreAggregatePreListener extends OperationEventListener {
     val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
     val carbonTable = tableEvent.carbonTable
     if (carbonTable != null) {
-      if (carbonTable.hasDataMapSchema) {
+      if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
         throw new UnsupportedOperationException(
           "Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
           "pre-aggregate tables to continue.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 851b851..5ad5308 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -544,8 +544,10 @@ object PreAggregateUtil {
   def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = {
     val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
     val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
-    tableSchema.getListOfColumns.asScala.foreach {
-      a => if (a.getAggFunction.nonEmpty) {
+    val columns = tableSchema.getListOfColumns.asScala
+      .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    columns.foreach { a =>
+      if (a.getAggFunction.nonEmpty) {
         aggregateColumns += s"${a.getAggFunction match {
           case "count" => "sum"
           case _ => a.getAggFunction}}(${a.getColumnName})"
@@ -558,4 +560,101 @@ object PreAggregateUtil {
       groupingExpressions.mkString(",") }"
   }
 
+  /**
+   * Below method will be used to get the select query when rollup policy is
+   * applied in case of timeseries table
+   * @param tableSchema
+   *                    main data map schema
+   * @param selectedDataMapSchema
+   *                              selected data map schema for rollup
+   * @return select query based on rolloup
+   */
+  def createTimeseriesSelectQueryForRollup(
+      tableSchema: TableSchema,
+      selectedDataMapSchema: AggregationDataMapSchema,
+      databaseName: String): String = {
+    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+    val columns = tableSchema.getListOfColumns.asScala
+      .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    columns.foreach { a =>
+      if (a.getAggFunction.nonEmpty) {
+        aggregateColumns += s"${a.getAggFunction match {
+          case "count" => "sum"
+          case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent(
+          a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})"
+      } else if (a.getTimeSeriesFunction.nonEmpty) {
+        groupingExpressions += s"timeseries(${
+          selectedDataMapSchema
+            .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+              get(0).getColumnName).getColumnName
+        } , '${ a.getTimeSeriesFunction }')"
+      } else {
+        groupingExpressions += selectedDataMapSchema
+          .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+            get(0).getColumnName).getColumnName
+      }
+    }
+    s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
+    } from $databaseName.${selectedDataMapSchema.getChildSchema.getTableName } " +
+    s"group by ${ groupingExpressions.mkString(",") }"
+  }
+
+  /**
+   * Below method will be used to creating select query for timeseries
+   * for lowest level for aggergation like second level, in that case it will
+   * hit the maintable
+   * @param tableSchema
+   *                    data map schema
+   * @param parentTableName
+   *                        parent schema
+   * @return select query for loading
+   */
+  def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
+      parentTableName: String,
+      databaseName: String): String = {
+    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+    val columns = tableSchema.getListOfColumns.asScala
+      .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    columns.foreach {a =>
+        if (a.getAggFunction.nonEmpty) {
+          aggregateColumns +=
+          s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })"
+        } else if (a.getTimeSeriesFunction.nonEmpty) {
+          groupingExpressions +=
+          s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${
+            a.getTimeSeriesFunction}')"
+        } else {
+          groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName
+        }
+    }
+    s"select ${ groupingExpressions.mkString(",") },${
+      aggregateColumns.mkString(",")
+    } from $databaseName.${ parentTableName } group by ${ groupingExpressions.mkString(",") }"
+
+  }
+    /**
+   * Below method will be used to select rollup table in case of
+   * timeseries data map loading
+   * @param list
+   *             list of timeseries datamap
+   * @param dataMapSchema
+   *                      datamap schema
+   * @return select table name
+   */
+  def getRollupDataMapNameForTimeSeries(
+      list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema],
+      dataMapSchema: AggregationDataMapSchema): Option[AggregationDataMapSchema] = {
+    if (list.isEmpty) {
+      None
+    } else {
+      val rollupDataMapSchema = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+      list.foreach{f =>
+        if (dataMapSchema.canSelectForRollup(f)) {
+          rollupDataMapSchema += f
+        } }
+      rollupDataMapSchema.lastOption
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
new file mode 100644
index 0000000..ad9ace7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.carbondata.core.preagg.TimeSeriesUDF
+
+/**
+ * Time series udf class
+ */
+
+class TimeSeriesFunction extends Function2[Timestamp, String, Timestamp] with Serializable{
+
+  override def apply(v1: Timestamp,
+      v2: String): Timestamp = {
+    TimeSeriesUDF.INSTANCE.applyUDF(v1, v2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
index 9d4ce56..6a4ef56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.sql.execution.command.timeseries
 
-import java.sql.Timestamp
-
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -142,18 +140,5 @@ object TimeSeriesUtil {
                      obj._2.aggregateFunction.isEmpty)
     isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
   }
-
-  /**
-   * UDF for timeseries
-   *
-   * @param timestamp
-   *                  timestamp
-   * @param timeSeriesFunctionType
-   *                               time series function
-   * @return updated timestamp based on function
-   */
-  def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = {
-    TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType)
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index abc58ff..f7a1eed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.util.CarbonReflectionUtils
@@ -61,7 +61,8 @@ case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
 case class CarbonMetaData(dims: Seq[String],
     msrs: Seq[String],
     carbonTable: CarbonTable,
-    dictionaryMap: DictionaryMap)
+    dictionaryMap: DictionaryMap,
+    hasAggregateDataMapSchema: Boolean)
 
 case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   def get(name: String): Option[Boolean] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 09e66de..4227dcb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -22,22 +22,22 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.CarbonExpressions.MatchCast
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
@@ -69,6 +69,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * 5. Order By Query rules.
  *    5.1 Update project list based on updated aggregate expression
  *    5.2 Update sort order attributes based on pre aggregate table
+ * 6. timeseries function
+ *    6.1 validate maintable has timeseries datamap
+ *    6.2 timeseries function is valid function or not
  *
  * @param sparkSession
  * spark session
@@ -115,8 +118,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           // if it is valid plan then extract the query columns
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -136,8 +139,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           // if it is valid plan then extract the query columns
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -148,11 +151,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
           // getting the columns from filter expression
           if(isValidPlan) {
-            filterExp.transform {
-              case attr: AttributeReference =>
-                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
-                attr
-            }
+            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
           }
           carbonTable
 
@@ -162,8 +161,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           // if it is valid plan then extract the query columns
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -180,8 +179,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
               aggregateExp,
               CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
             aggregateExp,
@@ -201,8 +200,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
               aggregateExp,
               Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
             aggregateExp,
@@ -213,11 +212,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           if (isValidPlan) {
             list ++
             extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
-            filterExp.transform {
-              case attr: AttributeReference =>
-                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
-                attr
-            }
+            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
           }
           carbonTable
         // case for handling aggregation with order by when only projection column exits
@@ -227,8 +222,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             aggregateExp,
             CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
             aggregateExp,
@@ -248,8 +243,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             aggregateExp,
             Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-               .hasDataMapSchema =>
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+               metaData.hasAggregateDataMapSchema =>
           val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
           isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
             aggregateExp,
@@ -261,11 +256,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
               carbonTable = carbonTable,
               tableName = tableName)
-            filterExp.transform {
-              case attr: AttributeReference =>
-                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
-                attr
-            }
+            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
           }
           carbonTable
         case _ =>
@@ -321,6 +312,65 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
+   * Below method will be used to extract the query columns from
+   * filter expression
+   * @param filterExp
+   *                  filter expression
+   * @param set
+   *             query column list
+   * @param carbonTable
+   *                    parent table
+   * @param tableName
+   *                  table name
+   * @return isvalid filter expression for aggregate
+   */
+  def extractQueryColumnFromFilterExp(filterExp: Expression,
+      set: scala.collection.mutable.HashSet[QueryColumn],
+      carbonTable: CarbonTable, tableName: String): Boolean = {
+    // map to maintain attribute reference present in the filter to timeseries function
+    // if applied this is added to avoid duplicate column
+    val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String]
+    var isValidPlan = true
+    filterExp.transform {
+      case attr: AttributeReference =>
+        if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+          mapOfColumnSeriesFun.put(attr, null)
+        }
+        attr
+      case udf@CarbonScalaUDF(_) =>
+        // for handling timeseries function
+        if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+          "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
+            CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
+          mapOfColumnSeriesFun.put(udf.children(0).asInstanceOf[AttributeReference],
+            udf.children(1).asInstanceOf[Literal].value.toString)
+        } else {
+          // for any other scala udf
+          udf.transform {
+            case attr: AttributeReference =>
+              if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+                mapOfColumnSeriesFun.put(attr, null)
+              }
+              attr
+          }
+        }
+        udf
+    }
+    mapOfColumnSeriesFun.foreach { f =>
+        if (f._2 == null) {
+          set +=
+          getQueryColumn(f._1.name, carbonTable, tableName, isFilterColumn = true)
+        } else {
+          set += getQueryColumn(f._1.name,
+            carbonTable,
+            carbonTable.getTableName,
+            isFilterColumn = true,
+            timeseriesFunction = f._2)
+        }
+    }
+    isValidPlan
+  }
+  /**
    * Below method will be used to extract columns from order by expression
    * @param projectList
    *                    project list from plan
@@ -383,13 +433,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       attributeReference: AttributeReference,
       attributes: Seq[AttributeReference],
       aggFunction: String = "",
-      canBeNull: Boolean = false): AttributeReference = {
+      canBeNull: Boolean = false,
+      timeseriesFunction: String = ""): AttributeReference = {
     val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
-    val columnSchema = if (aggFunction.isEmpty) {
+    val columnSchema = if (aggFunction.isEmpty && timeseriesFunction.isEmpty) {
       aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
-    } else {
+    } else if (!aggFunction.isEmpty) {
       aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase,
-        aggFunction.toLowerCase)
+        aggFunction)
+    } else {
+      aggregationDataMapSchema
+        .getTimeseriesChildColByParent(attributeReference.name.toLowerCase,
+          timeseriesFunction)
     }
     // here column schema cannot be null, if it is null then aggregate table selection
     // logic has some problem
@@ -427,6 +482,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * 5. Order by plan rules.
    *    5.1 Update project list based on updated aggregate expression
    *    5.2 Update sort order attributes based on pre aggregate table
+   * 6. timeseries function
+   *    6.1 validate parent table has timeseries datamap
+   *    6.2 timeseries function is valid function or not
    *
    * @param logicalPlan
    * parent logical plan
@@ -444,7 +502,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       // case for aggregation query
       case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, None) =
           getUpdatedExpressions(grExp,
             aggExp,
@@ -461,7 +520,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         aggExp,
         Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
           getUpdatedExpressions(grExp,
             aggExp,
@@ -477,7 +537,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         // case for aggregation query
       case Aggregate(grExp, aggExp, l: LogicalRelation)
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, None) =
           getUpdatedExpressions(grExp,
             aggExp,
@@ -497,7 +558,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             aggregateExp,
             subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, None) =
           getUpdatedExpressions(groupingExp,
             aggregateExp,
@@ -520,7 +582,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             aggregateExp,
             Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
           getUpdatedExpressions(groupingExp,
             aggregateExp,
@@ -544,8 +607,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           aggregateExp,
           subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
         if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-             .hasDataMapSchema =>
+           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, None) =
           getUpdatedExpressions(groupingExp,
             aggregateExp,
@@ -566,7 +629,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           aggregateExp,
           Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
         val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
           getUpdatedExpressions(groupingExp,
             aggregateExp,
@@ -615,6 +679,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           alias.qualifier,
           alias.isGenerated)
         alias
+      case alias@Alias(exp: Expression, name) =>
+        updatedProjectList += AttributeReference(name, exp.dataType, exp.nullable)(alias.exprId,
+          alias.qualifier,
+          alias.isGenerated)
+        alias
     }
     }
     // getting the updated sort order
@@ -724,6 +793,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         Alias(aggExp,
           name)(NamedExpression.newExprId,
           alias.qualifier).asInstanceOf[NamedExpression]
+      case alias@Alias(expression: Expression, name) =>
+        val updatedExp =
+          if (expression.isInstanceOf[ScalaUDF] &&
+              expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+                "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) {
+          expression.asInstanceOf[ScalaUDF].transform {
+            case attr: AttributeReference =>
+            val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+              attr,
+              attributes,
+              timeseriesFunction =
+                expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value.toString)
+            childAttributeReference
+          }
+        } else {
+          expression.transform{
+            case attr: AttributeReference =>
+              val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+                attr,
+                attributes)
+              childAttributeReference
+          }
+        }
+        Alias(updatedExp, name)(NamedExpression.newExprId,
+          alias.qualifier).asInstanceOf[NamedExpression]
     }
     // transformaing the logical relation
     val newChild = child.transform {
@@ -763,11 +857,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       // in case of alias we need to match with alias name and when alias is not present
       // we need to compare with attribute reference name
       case alias@Alias(attr: AttributeReference, name)
-        if attr.name.equals(sortOrderAttr.name) || name.equals(sortOrderAttr.name) =>
+        if attr.name.equalsIgnoreCase(sortOrderAttr.name) ||
+           name.equalsIgnoreCase(sortOrderAttr.name) =>
           AttributeReference(name,
-            attr.dataType,
-            attr.nullable,
-            attr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
+            sortOrderAttr.dataType,
+            sortOrderAttr.nullable,
+            sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
+      case alias@Alias(_: Expression, name)
+        if name.equalsIgnoreCase(sortOrderAttr.name) =>
+          AttributeReference(name,
+          sortOrderAttr.dataType,
+          sortOrderAttr.nullable,
+          sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
     }
     // any case it will match the condition, so no need to check whether updated expression is empty
     // or not
@@ -933,7 +1034,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       case attr: AttributeReference =>
         set += getQueryColumn(attr.name,
           carbonTable,
-          tableName);
+          tableName)
       case Alias(attr: AttributeReference, _) =>
         set += getQueryColumn(attr.name,
           carbonTable,
@@ -950,6 +1051,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         } else {
           return false
         }
+      case Alias(expression: Expression, _) =>
+        if (expression.isInstanceOf[ScalaUDF] &&
+            expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+              "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
+            CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
+          set += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0)
+            .asInstanceOf[AttributeReference].name,
+            carbonTable,
+            tableName,
+            timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal]
+              .value.toString)
+        } else {
+          expression.transform {
+            case attr: AttributeReference =>
+              set += getQueryColumn(attr.name,
+                carbonTable,
+                tableName)
+              attr
+          }
+        }
     }
     true
   }
@@ -1048,6 +1169,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     }
   }
 
+
+
   /**
    * Below method will be used to get the query column object which
    * will have details of the column and its property
@@ -1074,7 +1197,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       aggFunction: String = "",
       dataType: String = "",
       isChangedDataType: Boolean = false,
-      isFilterColumn: Boolean = false): QueryColumn = {
+      isFilterColumn: Boolean = false,
+      timeseriesFunction: String = ""): QueryColumn = {
     val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
     if(null == columnSchema) {
       null
@@ -1083,11 +1207,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         new QueryColumn(columnSchema.getColumnSchema,
           columnSchema.getDataType.getName,
           aggFunction.toLowerCase,
-          isFilterColumn)
+          isFilterColumn,
+          timeseriesFunction.toLowerCase)
       } else {
         new QueryColumn(columnSchema.getColumnSchema,
-        CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
-        aggFunction.toLowerCase, isFilterColumn)
+          CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
+          aggFunction.toLowerCase,
+          isFilterColumn,
+          timeseriesFunction.toLowerCase)
       }
     }
   }