You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 02:59:14 UTC
carbondata git commit: [CARBONDATA-1518][Pre-Aggregate]Support
creating timeseries while creating main table.
Repository: carbondata
Updated Branches:
refs/heads/master 0e8707a60 -> 49763b72b
[CARBONDATA-1518][Pre-Aggregate]Support creating timeseries while creating main table.
This closes #1565
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/49763b72
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/49763b72
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/49763b72
Branch: refs/heads/master
Commit: 49763b72bce8f38404e693b39bb440acb04e601f
Parents: 0e8707a
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Dec 5 16:00:48 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 08:29:01 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 +
.../ThriftWrapperSchemaConverterImpl.java | 15 +-
.../schema/table/column/ColumnSchema.java | 20 +++
.../core/preagg/TimeSeriesFunction.java | 40 +++++
.../carbondata/core/preagg/TimeSeriesUDF.java | 127 +++++++++++++++
.../util/AbstractDataFileFooterConverter.java | 11 +-
.../timeseries/TestTimeSeriesCreateTable.scala | 93 +++++++++++
.../command/carbonTableSchemaCommon.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +
.../datamap/CarbonCreateDataMapCommand.scala | 33 +++-
.../CreatePreAggregateTableCommand.scala | 13 +-
.../preaaggregate/PreAggregateUtil.scala | 2 +-
.../command/timeseries/TimeseriesUtil.scala | 159 +++++++++++++++++++
13 files changed, 513 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 43985b2..72d8b0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1440,6 +1440,10 @@ public final class CarbonCommonConstants {
*/
public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
+ public static final String TIMESERIES_EVENTTIME = "timeseries.eventtime";
+
+ public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 8a24e38..c1e68da 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesUDF;
/**
* Thrift schema to carbon schema converter and vice versa
@@ -198,6 +199,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
thriftColumnSchema.setColumnProperties(properties);
}
thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getAggFunction());
+ if (null != wrapperColumnSchema.getTimeSeriesFunction() && !wrapperColumnSchema
+ .getTimeSeriesFunction().isEmpty()) {
+ thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getTimeSeriesFunction());
+ }
List<ParentColumnTableRelation> parentColumnTableRelations =
wrapperColumnSchema.getParentColumnTableRelations();
if (null != parentColumnTableRelations) {
@@ -518,7 +523,15 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
wrapperColumnSchema.setSortColumn(true);
}
}
- wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+ if (null != externalColumnSchema.getAggregate_function().toLowerCase()) {
+ if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+ .contains(externalColumnSchema.getAggregate_function().toLowerCase())) {
+ wrapperColumnSchema
+ .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase());
+ } else {
+ wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+ }
+ }
List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
externalColumnSchema.getParentColumnTableRelations();
if (null != parentColumnTableRelation) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index ea7005f..edae4d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -126,9 +126,17 @@ public class ColumnSchema implements Serializable, Writable {
*/
private String aggFunction = "";
+ /**
+ * list of parent column relations
+ */
private List<ParentColumnTableRelation> parentColumnTableRelations;
/**
+ * timeseries function applied on column
+ */
+ private String timeSeriesFunction = "";
+
+ /**
* @return the columnName
*/
public String getColumnName() {
@@ -439,6 +447,16 @@ public class ColumnSchema implements Serializable, Writable {
this.aggFunction = aggFunction;
}
+ public String getTimeSeriesFunction() {
+ return timeSeriesFunction;
+ }
+
+ public void setTimeSeriesFunction(String timeSeriesFunction) {
+ if (null != timeSeriesFunction) {
+ this.timeSeriesFunction = timeSeriesFunction;
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeShort(dataType.getId());
@@ -476,6 +494,7 @@ public class ColumnSchema implements Serializable, Writable {
out.writeBoolean(invisible);
out.writeBoolean(isSortColumn);
out.writeUTF(null != aggFunction ? aggFunction : "");
+ out.writeUTF(timeSeriesFunction);
boolean isParentTableColumnRelationExists =
null != parentColumnTableRelations && parentColumnTableRelations.size() > 0;
out.writeBoolean(isParentTableColumnRelationExists);
@@ -521,6 +540,7 @@ public class ColumnSchema implements Serializable, Writable {
this.invisible = in.readBoolean();
this.isSortColumn = in.readBoolean();
this.aggFunction = in.readUTF();
+ this.timeSeriesFunction = in.readUTF();
boolean isParentTableColumnRelationExists = in.readBoolean();
if (isParentTableColumnRelationExists) {
short parentColumnTableRelationSize = in.readShort();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
new file mode 100644
index 0000000..02ff753
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+/**
+ * enum for timeseries function
+ */
+public enum TimeSeriesFunction {
+ SECOND("second"),
+ MINUTE("minute"),
+ HOUR("hour"),
+ DAY("day"),
+ MONTH("month"),
+ YEAR("year");
+
+ private String name;
+
+ TimeSeriesFunction(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
new file mode 100644
index 0000000..50cb052
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * class for applying timeseries udf
+ */
+public class TimeSeriesUDF {
+
+ public final List<String> TIMESERIES_FUNCTION = new ArrayList<>();
+
+ // thread local for keeping calender instance
+ private ThreadLocal<Calendar> calanderThreadLocal = new ThreadLocal<>();
+
+ /**
+ * singleton instance
+ */
+ public static final TimeSeriesUDF INSTANCE = new TimeSeriesUDF();
+
+ private TimeSeriesUDF() {
+ initialize();
+ }
+
+ /**
+ * Below method will be used to apply udf on data provided
+ * Method will work based on below logic.
+ * Data: 2016-7-23 01:01:30,10
+ * Year Level UDF will return: 2016-1-1 00:00:00,0
+ * Month Level UDF will return: 2016-7-1 00:00:00,0
+ * Day Level UDF will return: 2016-7-23 00:00:00,0
+ * Hour Level UDF will return: 2016-7-23 01:00:00,0
+ * Minute Level UDF will return: 2016-7-23 01:01:00,0
+ * Second Level UDF will return: 2016-7-23 01:01:30,0
+ * If function does not match with any of the above functions
+ * it will throw IllegalArgumentException
+ *
+ * @param data timestamp data
+ * @param function time series function name
+ * @return data after applying udf
+ */
+ public Timestamp applyUDF(Timestamp data, String function) {
+ if (null == data) {
+ return data;
+ }
+ initialize();
+ Calendar calendar = calanderThreadLocal.get();
+ calendar.clear();
+ calendar.setTimeInMillis(data.getTime());
+ TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function);
+ switch (timeSeriesFunction) {
+ case SECOND:
+ calendar.set(Calendar.MILLISECOND, 0);
+ break;
+ case MINUTE:
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.set(Calendar.SECOND, 0);
+ break;
+ case HOUR:
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ break;
+ case DAY:
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ break;
+ case MONTH:
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.HOUR, 0);
+ calendar.set(Calendar.DAY_OF_MONTH, 1);
+ break;
+ case YEAR:
+ calendar.set(Calendar.MONTH, 1);
+ calendar.set(Calendar.DAY_OF_YEAR, 1);
+ calendar.set(Calendar.HOUR, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid timeseries function name: " + function);
+ }
+ data.setTime(calendar.getTimeInMillis());
+ return data;
+ }
+
+ /**
+ * Below method will be used to initialize the thread local
+ */
+ private synchronized void initialize() {
+ if (calanderThreadLocal.get() == null) {
+ calanderThreadLocal.set(new GregorianCalendar());
+ }
+ if (TIMESERIES_FUNCTION.isEmpty()) {
+ TIMESERIES_FUNCTION.add("second");
+ TIMESERIES_FUNCTION.add("minute");
+ TIMESERIES_FUNCTION.add("hour");
+ TIMESERIES_FUNCTION.add("day");
+ TIMESERIES_FUNCTION.add("month");
+ TIMESERIES_FUNCTION.add("year");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index b9ec3f1..f65e98d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesUDF;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
@@ -289,7 +290,15 @@ public abstract class AbstractDataFileFooterConverter {
wrapperColumnSchema.setSortColumn(true);
}
}
- wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+ if (null != externalColumnSchema.getAggregate_function()) {
+ if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+ .contains(externalColumnSchema.getAggregate_function().toLowerCase())) {
+ wrapperColumnSchema
+ .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase());
+ } else {
+ wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function());
+ }
+ }
List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
externalColumnSchema.getParentColumnTableRelations();
if (null != parentColumnTableRelation) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
new file mode 100644
index 0000000..b60e487
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -0,0 +1,93 @@
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+ }
+
+ test("test timeseries create table Zero") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second")
+ sql("drop datamap agg0_second on table mainTable")
+ }
+
+ test("test timeseries create table One") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour")
+ sql("drop datamap agg0_hour on table mainTable")
+ }
+ test("test timeseries create table two") {
+ checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day")
+ sql("drop datamap agg0_day on table mainTable")
+ }
+ test("test timeseries create table three") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month")
+ sql("drop datamap agg0_month on table mainTable")
+ }
+ test("test timeseries create table four") {
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year")
+ sql("drop datamap agg0_year on table mainTable")
+ }
+
+ test("test timeseries create table five") {
+ try {
+ sql(
+ "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='sec=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+ assert(false)
+ } catch {
+ case _:Exception =>
+ assert(true)
+ }
+ }
+
+ test("test timeseries create table Six") {
+ try {
+ sql(
+ "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=2') as select dataTime, sum(age) from mainTable group by dataTime")
+ assert(false)
+ } catch {
+ case _:Exception =>
+ assert(true)
+ }
+ }
+
+ test("test timeseries create table seven") {
+ try {
+ sql(
+ "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select dataTime, sum(age) from mainTable group by dataTime")
+ assert(false)
+ } catch {
+ case _:Exception =>
+ assert(true)
+ }
+ }
+
+ test("test timeseries create table Eight") {
+ try {
+ sql(
+ "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='name', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name")
+ assert(false)
+ } catch {
+ case _:Exception =>
+ assert(true)
+ }
+ }
+
+ test("test timeseries create table Nine") {
+ try {
+ sql(
+ "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name")
+ assert(false)
+ } catch {
+ case _:Exception =>
+ assert(true)
+ }
+ }
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 44f577d..37663ea 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -76,7 +76,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
override def hashCode : Int = column.hashCode
}
-case class DataMapField(aggregateFunction: String = "",
+case class DataMapField(var aggregateFunction: String = "",
columnTableRelation: Option[ColumnTableRelation] = None) {
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 53b20c2..d68bc41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql
+import java.sql.Timestamp
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.hive._
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -61,6 +63,10 @@ class CarbonEnv {
// only then the CarbonPreAggregateDataLoadingRules would be applied to split the average
// column to sum and count.
sparkSession.udf.register("preAggLoad", () => "")
+
+ // added for handling timeseries function like hour, minute, day , month , year
+ sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) =>
+ TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction))
synchronized {
if (!initialized) {
// update carbon session parameters , preserve thread parameters
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index f90abb8..a3aa36d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -50,13 +52,30 @@ case class CarbonCreateDataMapCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
dmClassName.equalsIgnoreCase("preaggregate")) {
- CreatePreAggregateTableCommand(
- dataMapName,
- tableIdentifier,
- dmClassName,
- dmproperties,
- queryString.get
- ).processMetadata(sparkSession)
+ val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
+ if (timeHierarchyString.isDefined) {
+ val details = TimeSeriesUtil
+ .validateAndGetTimeSeriesHierarchyDetails(
+ timeHierarchyString.get)
+ val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY
+ details.foreach { f =>
+ CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
+ tableIdentifier,
+ dmClassName,
+ updatedDmProperties,
+ queryString.get,
+ Some(f._1)).run(sparkSession)
+ }
+ }
+ else {
+ CreatePreAggregateTableCommand(
+ dataMapName,
+ tableIdentifier,
+ dmClassName,
+ dmproperties,
+ queryString.get
+ ).processMetadata(sparkSession)
+ }
} else {
val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 1ebf511..1c23d3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -44,7 +45,8 @@ case class CreatePreAggregateTableCommand(
parentTableIdentifier: TableIdentifier,
dmClassName: String,
dmProperties: Map[String, String],
- queryString: String)
+ queryString: String,
+ timeSeriesFunction: Option[String] = None)
extends AtomicRunnableCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -74,6 +76,15 @@ case class CreatePreAggregateTableCommand(
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
+ if(timeSeriesFunction.isDefined) {
+ TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
+ TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
+ fieldRelationMap,
+ dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get)
+ TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
+ dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get,
+ timeSeriesFunction.get)
+ }
tableModel.parentTable = Some(parentTable)
tableModel.dataMapRelation = Some(fieldRelationMap)
val tablePath =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 95a711e..c602b0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -237,7 +237,7 @@ object PreAggregateUtil {
parentTableName,
parentDatabaseName, parentTableId = parentTableId)
case Average(attr: AttributeReference) =>
- getField(attr.name,
+ list += getField(attr.name,
attr.dataType,
"sum",
carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
new file mode 100644
index 0000000..9d4ce56
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.execution.command.{DataMapField, Field}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.preagg.TimeSeriesUDF
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Utility class for time series to keep
+ */
+object TimeSeriesUtil {
+
+ /**
+ * Below method will be used to validate whether column mentioned in time series
+ * is timestamp column or not
+ *
+ * @param dmproperties
+ * data map properties
+ * @param parentTable
+ * parent table
+ * @return whether time stamp column
+ */
+ def validateTimeSeriesEventTime(dmproperties: Map[String, String],
+ parentTable: CarbonTable) {
+ val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME)
+ if (!eventTime.isDefined) {
+ throw new MalformedCarbonCommandException("Eventtime not defined in time series")
+ } else {
+ val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get)
+ if (carbonColumn.getDataType != DataTypes.TIMESTAMP) {
+ throw new MalformedCarbonCommandException(
+ "Timeseries event time is only supported on Timestamp " +
+ "column")
+ }
+ }
+ }
+
+ /**
+ * Below method will be used to validate the hierarchy of time series and its value
+ * validation will be done whether hierarchy order is proper or not and hierarchy level
+ * value
+ *
+ * @param timeSeriesHierarchyDetails
+ * time series hierarchy string
+ */
+ def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[
+ (String, String)] = {
+ val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase
+ val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",")
+ val hierBuffer = timeSeriesHierarchy.map {
+ case f =>
+ val splits = f.split("=")
+ // checking hierarchy name is valid or not
+ if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) {
+ throw new MalformedCarbonCommandException(s"Not supported heirarchy type: ${ splits(0) }")
+
+ }
+ // validating hierarchy level is valid or not
+ if (!splits(1).equals("1")) {
+ throw new MalformedCarbonCommandException(
+ s"Unsupported Value for hierarchy:" +
+ s"${ splits(0) }=${ splits(1) }")
+ }
+ (splits(0), splits(1))
+ }
+ // checking whether hierarchy is in proper order or not
+ // get the index of first hierarchy
+ val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+ .indexOf(hierBuffer(0)._1.toLowerCase)
+ val index = 0
+ // now iterating through complete hierarchy to check any of the hierarchy index
+ // is less than first one
+ for (index <- 1 to hierBuffer.size - 1) {
+ val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
+ .indexOf(hierBuffer(index)._1.toLowerCase)
+ if (currentIndex < indexOfFirstHierarchy) {
+ throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order")
+ }
+ }
+ hierBuffer
+ }
+
+ /**
+ * Below method will be used to validate whether timeseries column present in
+ * select statement or not
+ * @param fieldMapping
+ * fields from select plan
+ * @param timeSeriesColumn
+ * timeseries column name
+ */
+ def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable
+ .LinkedHashMap[Field, DataMapField],
+ timeSeriesColumn: String) : Any = {
+ val isTimeSeriesColumnExits = fieldMapping
+ .exists(obj => obj._2.columnTableRelation.isDefined &&
+ obj._2.columnTableRelation.get.parentColumnName
+ .equalsIgnoreCase(timeSeriesColumn) &&
+ obj._2.aggregateFunction.isEmpty)
+ if(!isTimeSeriesColumnExits) {
+ throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " +
+ s"not exists in select")
+ }
+ }
+
+ /**
+ * Below method will be used to validate whether timeseries column present in
+ * select statement or not
+ * @param fieldMapping
+ * fields from select plan
+ * @param timeSeriesColumn
+ * timeseries column name
+ */
+ def updateTimeColumnSelect(fieldMapping: scala.collection.mutable
+ .LinkedHashMap[Field, DataMapField],
+ timeSeriesColumn: String,
+ timeSeriesFunction: String) : Any = {
+ val isTimeSeriesColumnExits = fieldMapping
+ .find(obj => obj._2.columnTableRelation.isDefined &&
+ obj._2.columnTableRelation.get.parentColumnName
+ .equalsIgnoreCase(timeSeriesColumn) &&
+ obj._2.aggregateFunction.isEmpty)
+ isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
+ }
+
+ /**
+ * UDF for timeseries
+ *
+ * @param timestamp
+ * timestamp
+ * @param timeSeriesFunctionType
+ * time series function
+ * @return updated timestamp based on function
+ */
+ def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = {
+ TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType)
+ }
+}
+