You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 02:59:14 UTC

carbondata git commit: [CARBONDATA-1518][Pre-Aggregate]Support creating timeseries while creating main table.

Repository: carbondata
Updated Branches:
  refs/heads/master 0e8707a60 -> 49763b72b


[CARBONDATA-1518][Pre-Aggregate]Support creating timeseries while creating main table.

This closes #1565


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/49763b72
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/49763b72
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/49763b72

Branch: refs/heads/master
Commit: 49763b72bce8f38404e693b39bb440acb04e601f
Parents: 0e8707a
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Dec 5 16:00:48 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 08:29:01 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../ThriftWrapperSchemaConverterImpl.java       |  15 +-
 .../schema/table/column/ColumnSchema.java       |  20 +++
 .../core/preagg/TimeSeriesFunction.java         |  40 +++++
 .../carbondata/core/preagg/TimeSeriesUDF.java   | 127 +++++++++++++++
 .../util/AbstractDataFileFooterConverter.java   |  11 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  |  93 +++++++++++
 .../command/carbonTableSchemaCommon.scala       |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   6 +
 .../datamap/CarbonCreateDataMapCommand.scala    |  33 +++-
 .../CreatePreAggregateTableCommand.scala        |  13 +-
 .../preaaggregate/PreAggregateUtil.scala        |   2 +-
 .../command/timeseries/TimeseriesUtil.scala     | 159 +++++++++++++++++++
 13 files changed, 513 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 43985b2..72d8b0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1440,6 +1440,10 @@ public final class CarbonCommonConstants {
    */
   public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
 
+  public static final String TIMESERIES_EVENTTIME = "timeseries.eventtime";
+
+  public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 8a24e38..c1e68da 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesUDF;
 
 /**
  * Thrift schema to carbon schema converter and vice versa
@@ -198,6 +199,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       thriftColumnSchema.setColumnProperties(properties);
     }
     thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getAggFunction());
+    if (null != wrapperColumnSchema.getTimeSeriesFunction() && !wrapperColumnSchema
+        .getTimeSeriesFunction().isEmpty()) {
+      thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getTimeSeriesFunction());
+    }
     List<ParentColumnTableRelation> parentColumnTableRelations =
         wrapperColumnSchema.getParentColumnTableRelations();
     if (null != parentColumnTableRelations) {
@@ -518,7 +523,15 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         wrapperColumnSchema.setSortColumn(true);
       }
     }
-    wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+    if (null != externalColumnSchema.getAggregate_function().toLowerCase()) {
+      if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+          .contains(externalColumnSchema.getAggregate_function().toLowerCase())) {
+        wrapperColumnSchema
+            .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase());
+      } else {
+        wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+      }
+    }
     List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
         externalColumnSchema.getParentColumnTableRelations();
     if (null != parentColumnTableRelation) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index ea7005f..edae4d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -126,9 +126,17 @@ public class ColumnSchema implements Serializable, Writable {
    */
   private String aggFunction = "";
 
+  /**
+   * list of parent column relations
+   */
   private List<ParentColumnTableRelation> parentColumnTableRelations;
 
   /**
+   * timeseries function applied on column
+   */
+  private String timeSeriesFunction = "";
+
+  /**
    * @return the columnName
    */
   public String getColumnName() {
@@ -439,6 +447,16 @@ public class ColumnSchema implements Serializable, Writable {
     this.aggFunction = aggFunction;
   }
 
+  public String getTimeSeriesFunction() {
+    return timeSeriesFunction;
+  }
+
+  public void setTimeSeriesFunction(String timeSeriesFunction) {
+    if (null != timeSeriesFunction) {
+      this.timeSeriesFunction = timeSeriesFunction;
+    }
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeShort(dataType.getId());
@@ -476,6 +494,7 @@ public class ColumnSchema implements Serializable, Writable {
     out.writeBoolean(invisible);
     out.writeBoolean(isSortColumn);
     out.writeUTF(null != aggFunction ? aggFunction : "");
+    out.writeUTF(timeSeriesFunction);
     boolean isParentTableColumnRelationExists =
         null != parentColumnTableRelations && parentColumnTableRelations.size() > 0;
     out.writeBoolean(isParentTableColumnRelationExists);
@@ -521,6 +540,7 @@ public class ColumnSchema implements Serializable, Writable {
     this.invisible = in.readBoolean();
     this.isSortColumn = in.readBoolean();
     this.aggFunction = in.readUTF();
+    this.timeSeriesFunction = in.readUTF();
     boolean isParentTableColumnRelationExists = in.readBoolean();
     if (isParentTableColumnRelationExists) {
       short parentColumnTableRelationSize = in.readShort();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
new file mode 100644
index 0000000..02ff753
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+/**
+ * enum for timeseries function
+ */
+public enum TimeSeriesFunction {
+  SECOND("second"),
+  MINUTE("minute"),
+  HOUR("hour"),
+  DAY("day"),
+  MONTH("month"),
+  YEAR("year");
+
+  private String name;
+
+  TimeSeriesFunction(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
new file mode 100644
index 0000000..50cb052
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * class for applying timeseries udf
+ */
+public class TimeSeriesUDF {
+
+  public final List<String> TIMESERIES_FUNCTION = new ArrayList<>();
+
+  // thread local for keeping calender instance
+  private ThreadLocal<Calendar> calanderThreadLocal = new ThreadLocal<>();
+
+  /**
+   * singleton instance
+   */
+  public static final TimeSeriesUDF INSTANCE = new TimeSeriesUDF();
+
+  private TimeSeriesUDF() {
+    initialize();
+  }
+
+  /**
+   * Below method will be used to apply udf on data provided
+   * Method will work based on below logic.
+   * Data: 2016-7-23 01:01:30,10
+   * Year Level UDF will return: 2016-1-1 00:00:00,0
+   * Month Level UDF will return: 2016-7-1 00:00:00,0
+   * Day Level UDF will return: 2016-7-23 00:00:00,0
+   * Hour Level UDF will return: 2016-7-23 01:00:00,0
+   * Minute Level UDF will return: 2016-7-23 01:01:00,0
+   * Second Level UDF will return: 2016-7-23 01:01:30,0
+   * If function does not match with any of the above functions
+   * it will throw IllegalArgumentException
+   *
+   * @param data     timestamp data
+   * @param function time series function name
+   * @return data after applying udf
+   */
+  public Timestamp applyUDF(Timestamp data, String function) {
+    if (null == data) {
+      return data;
+    }
+    initialize();
+    Calendar calendar = calanderThreadLocal.get();
+    calendar.clear();
+    calendar.setTimeInMillis(data.getTime());
+    TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function);
+    switch (timeSeriesFunction) {
+      case SECOND:
+        calendar.set(Calendar.MILLISECOND, 0);
+        break;
+      case MINUTE:
+        calendar.set(Calendar.MILLISECOND, 0);
+        calendar.set(Calendar.SECOND, 0);
+        break;
+      case HOUR:
+        calendar.set(Calendar.MILLISECOND, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        break;
+      case DAY:
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        break;
+      case MONTH:
+        calendar.set(Calendar.MILLISECOND, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.HOUR, 0);
+        calendar.set(Calendar.DAY_OF_MONTH, 1);
+        break;
+      case YEAR:
+        calendar.set(Calendar.MONTH, 1);
+        calendar.set(Calendar.DAY_OF_YEAR, 1);
+        calendar.set(Calendar.HOUR, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid timeseries function name: " + function);
+    }
+    data.setTime(calendar.getTimeInMillis());
+    return data;
+  }
+
+  /**
+   * Below method will be used to initialize the thread local
+   */
+  private synchronized void initialize() {
+    if (calanderThreadLocal.get() == null) {
+      calanderThreadLocal.set(new GregorianCalendar());
+    }
+    if (TIMESERIES_FUNCTION.isEmpty()) {
+      TIMESERIES_FUNCTION.add("second");
+      TIMESERIES_FUNCTION.add("minute");
+      TIMESERIES_FUNCTION.add("hour");
+      TIMESERIES_FUNCTION.add("day");
+      TIMESERIES_FUNCTION.add("month");
+      TIMESERIES_FUNCTION.add("year");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index b9ec3f1..f65e98d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesUDF;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
@@ -289,7 +290,15 @@ public abstract class AbstractDataFileFooterConverter {
         wrapperColumnSchema.setSortColumn(true);
       }
     }
-    wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+    if (null != externalColumnSchema.getAggregate_function()) {
+      if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+          .contains(externalColumnSchema.getAggregate_function().toLowerCase())) {
+        wrapperColumnSchema
+            .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase());
+      } else {
+        wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+      }
+    }
     List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
         externalColumnSchema.getParentColumnTableRelations();
     if (null != parentColumnTableRelation) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
new file mode 100644
index 0000000..b60e487
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -0,0 +1,93 @@
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+  }
+
+  test("test timeseries create table Zero") {
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second")
+    sql("drop datamap agg0_second on table mainTable")
+  }
+
+  test("test timeseries create table One") {
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour")
+    sql("drop datamap agg0_hour on table mainTable")
+  }
+  test("test timeseries create table two") {
+    checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day")
+    sql("drop datamap agg0_day on table mainTable")
+  }
+  test("test timeseries create table three") {
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month")
+    sql("drop datamap agg0_month on table mainTable")
+  }
+  test("test timeseries create table four") {
+    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year")
+    sql("drop datamap agg0_year on table mainTable")
+  }
+
+  test("test timeseries create table five") {
+    try {
+      sql(
+        "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='sec=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+      assert(false)
+    } catch {
+      case _:Exception =>
+        assert(true)
+    }
+  }
+
+  test("test timeseries create table Six") {
+    try {
+      sql(
+        "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=2') as select dataTime, sum(age) from mainTable group by dataTime")
+      assert(false)
+    } catch {
+      case _:Exception =>
+        assert(true)
+    }
+  }
+
+  test("test timeseries create table seven") {
+    try {
+      sql(
+        "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select dataTime, sum(age) from mainTable group by dataTime")
+      assert(false)
+    } catch {
+      case _:Exception =>
+        assert(true)
+    }
+  }
+
+  test("test timeseries create table Eight") {
+    try {
+      sql(
+        "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='name', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name")
+      assert(false)
+    } catch {
+      case _:Exception =>
+        assert(true)
+    }
+  }
+
+  test("test timeseries create table Nine") {
+    try {
+      sql(
+        "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name")
+      assert(false)
+    } catch {
+      case _:Exception =>
+        assert(true)
+    }
+  }
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 44f577d..37663ea 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -76,7 +76,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
   override def hashCode : Int = column.hashCode
 }
 
-case class DataMapField(aggregateFunction: String = "",
+case class DataMapField(var aggregateFunction: String = "",
     columnTableRelation: Option[ColumnTableRelation] = None) {
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 53b20c2..d68bc41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql
 
+import java.sql.Timestamp
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.hive._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -61,6 +63,10 @@ class CarbonEnv {
     // only then the CarbonPreAggregateDataLoadingRules would be applied to split the average
     // column to sum and count.
     sparkSession.udf.register("preAggLoad", () => "")
+
+    // added for handling timeseries function like hour, minute, day , month , year
+    sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) =>
+      TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction))
     synchronized {
       if (!initialized) {
         // update carbon session parameters , preserve thread parameters

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index f90abb8..a3aa36d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -50,13 +52,30 @@ case class CarbonCreateDataMapCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
-      CreatePreAggregateTableCommand(
-        dataMapName,
-        tableIdentifier,
-        dmClassName,
-        dmproperties,
-        queryString.get
-      ).processMetadata(sparkSession)
+      val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
+      if (timeHierarchyString.isDefined) {
+        val details = TimeSeriesUtil
+          .validateAndGetTimeSeriesHierarchyDetails(
+            timeHierarchyString.get)
+        val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY
+        details.foreach { f =>
+          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
+            tableIdentifier,
+            dmClassName,
+            updatedDmProperties,
+            queryString.get,
+            Some(f._1)).run(sparkSession)
+        }
+      }
+      else {
+        CreatePreAggregateTableCommand(
+          dataMapName,
+          tableIdentifier,
+          dmClassName,
+          dmproperties,
+          queryString.get
+        ).processMetadata(sparkSession)
+      }
     } else {
       val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
       dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 1ebf511..1c23d3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -44,7 +45,8 @@ case class CreatePreAggregateTableCommand(
     parentTableIdentifier: TableIdentifier,
     dmClassName: String,
     dmProperties: Map[String, String],
-    queryString: String)
+    queryString: String,
+    timeSeriesFunction: Option[String] = None)
   extends AtomicRunnableCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -74,6 +76,15 @@ case class CreatePreAggregateTableCommand(
     // updating the relation identifier, this will be stored in child table
     // which can be used during dropping of pre-aggreate table as parent table will
     // also get updated
+    if(timeSeriesFunction.isDefined) {
+      TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
+      TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
+        fieldRelationMap,
+        dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get)
+      TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
+        dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get,
+      timeSeriesFunction.get)
+    }
     tableModel.parentTable = Some(parentTable)
     tableModel.dataMapRelation = Some(fieldRelationMap)
     val tablePath =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 95a711e..c602b0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -237,7 +237,7 @@ object PreAggregateUtil {
           parentTableName,
           parentDatabaseName, parentTableId = parentTableId)
       case Average(attr: AttributeReference) =>
-        getField(attr.name,
+        list += getField(attr.name,
           attr.dataType,
           "sum",
           carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
new file mode 100644
index 0000000..9d4ce56
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.execution.command.{DataMapField, Field}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.preagg.TimeSeriesUDF
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Utility class for time series to keep
+ */
+object TimeSeriesUtil {
+
+  /**
+   * Below method will be used to validate whether column mentioned in time series
+   * is timestamp column or not
+   *
+   * @param dmproperties
+   * data map properties
+   * @param parentTable
+   * parent table
+   * @return whether time stamp column
+   */
+  def validateTimeSeriesEventTime(dmproperties: Map[String, String],
+      parentTable: CarbonTable) {
+    val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME)
+    if (!eventTime.isDefined) {
+      throw new MalformedCarbonCommandException("Eventtime not defined in time series")
+    } else {
+      val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get)
+      if (carbonColumn.getDataType != DataTypes.TIMESTAMP) {
+        throw new MalformedCarbonCommandException(
+          "Timeseries event time is only supported on Timestamp " +
+          "column")
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to validate the hierarchy of time series and its value
+   * validation will be done whether hierarchy order is proper or not and hierarchy level
+   * value
+   *
+   * @param timeSeriesHierarchyDetails
+   * time series hierarchy string
+   */
+  def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[
+    (String, String)] = {
+    val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase
+    val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",")
+    val hierBuffer = timeSeriesHierarchy.map {
+      case f =>
+        val splits = f.split("=")
+        // checking hierarchy name is valid or not
+        if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) {
+          throw new MalformedCarbonCommandException(s"Not supported heirarchy type: ${ splits(0) }")
+
+        }
+        // validating hierarchy level is valid or not
+        if (!splits(1).equals("1")) {
+          throw new MalformedCarbonCommandException(
+            s"Unsupported Value for hierarchy:" +
+            s"${ splits(0) }=${ splits(1) }")
+        }
+        (splits(0), splits(1))
+    }
+    // checking whether hierarchy is in proper order or not
+    // get the index of first hierarchy
+    val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+      .indexOf(hierBuffer(0)._1.toLowerCase)
+    val index = 0
+    // now iterating through complete hierarchy to check any of the hierarchy index
+    // is less than first one
+    for (index <- 1 to hierBuffer.size - 1) {
+      val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+        .indexOf(hierBuffer(index)._1.toLowerCase)
+      if (currentIndex < indexOfFirstHierarchy) {
+        throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order")
+      }
+    }
+    hierBuffer
+  }
+
+  /**
+   * Below method will be used to validate whether timeseries column present in
+   * select statement or not
+   * @param fieldMapping
+   *                     fields from select plan
+   * @param timeSeriesColumn
+   *                         timeseries column name
+   */
+  def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable
+  .LinkedHashMap[Field, DataMapField],
+      timeSeriesColumn: String) : Any = {
+    val isTimeSeriesColumnExits = fieldMapping
+      .exists(obj => obj._2.columnTableRelation.isDefined &&
+                     obj._2.columnTableRelation.get.parentColumnName
+                       .equalsIgnoreCase(timeSeriesColumn) &&
+                     obj._2.aggregateFunction.isEmpty)
+    if(!isTimeSeriesColumnExits) {
+      throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " +
+                                                s"not exists in select")
+    }
+  }
+
+  /**
+   * Below method will be used to validate whether timeseries column present in
+   * select statement or not
+   * @param fieldMapping
+   *                     fields from select plan
+   * @param timeSeriesColumn
+   *                         timeseries column name
+   */
+  def updateTimeColumnSelect(fieldMapping: scala.collection.mutable
+  .LinkedHashMap[Field, DataMapField],
+      timeSeriesColumn: String,
+      timeSeriesFunction: String) : Any = {
+    val isTimeSeriesColumnExits = fieldMapping
+      .find(obj => obj._2.columnTableRelation.isDefined &&
+                     obj._2.columnTableRelation.get.parentColumnName
+                       .equalsIgnoreCase(timeSeriesColumn) &&
+                     obj._2.aggregateFunction.isEmpty)
+    isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
+  }
+
+  /**
+   * UDF for timeseries
+   *
+   * @param timestamp
+   *                  timestamp
+   * @param timeSeriesFunctionType
+   *                               time series function
+   * @return updated timestamp based on function
+   */
+  def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = {
+    TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType)
+  }
+}
+