You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 22:39:43 UTC
[2/2] carbondata git commit: [CARBONDATA-1519][PreAgg-Timeseries]
Support Query and Load on timeseries table
[CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table
This closes #1626
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e2a79eeb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e2a79eeb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e2a79eeb
Branch: refs/heads/master
Commit: e2a79eebbcbe641f657e84ccf86a9a7e84e8e735
Parents: 0da0a4f
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Dec 5 20:56:58 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Dec 8 04:09:25 2017 +0530
----------------------------------------------------------------------
.../schema/table/AggregationDataMapSchema.java | 129 +++++++++-
.../core/preagg/AggregateTableSelector.java | 31 ++-
.../carbondata/core/preagg/QueryColumn.java | 20 +-
.../core/preagg/TimeSeriesFunction.java | 40 ----
.../core/preagg/TimeSeriesFunctionEnum.java | 53 +++++
.../carbondata/core/preagg/TimeSeriesUDF.java | 5 +-
.../apache/carbondata/core/util/CarbonUtil.java | 32 +++
.../src/test/resources/timeseriestest.csv | 7 +
.../TestPreAggregateTableSelection.scala | 1 -
.../timeseries/TestTimeSeriesCreateTable.scala | 18 +-
.../timeseries/TestTimeseriesDataLoad.scala | 79 +++++++
.../TestTimeseriesTableSelection.scala | 131 +++++++++++
.../carbondata/spark/util/CarbonSparkUtil.scala | 7 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +-
.../apache/spark/sql/CarbonExpressions.scala | 16 +-
.../preaaggregate/PreAggregateListeners.scala | 65 +++--
.../preaaggregate/PreAggregateUtil.scala | 105 ++++++++-
.../command/timeseries/TimeSeriesFunction.scala | 33 +++
.../command/timeseries/TimeseriesUtil.scala | 15 --
.../spark/sql/hive/CarbonFileMetastore.scala | 5 +-
.../sql/hive/CarbonPreAggregateRules.scala | 235 ++++++++++++++-----
.../src/main/spark2.1/CarbonSessionState.scala | 4 +-
.../src/main/spark2.2/CarbonSessionState.scala | 4 +-
23 files changed, 885 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 9bfb22c..8f6a2d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum;
/**
* data map schema class for pre aggregation
@@ -50,6 +51,17 @@ public class AggregationDataMapSchema extends DataMapSchema {
*/
private Map<String, Set<String>> parentColumnToAggregationsMapping;
+ /**
+ * whether its a timeseries data map
+ */
+ private boolean isTimeseriesDataMap;
+
+ /**
+ * below ordinal will be used during sorting the data map
+ * to support rollup for loading
+ */
+ private int ordinal = Integer.MAX_VALUE;
+
public AggregationDataMapSchema(String dataMapName, String className) {
super(dataMapName, className);
}
@@ -63,6 +75,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
}
/**
+ * Below method will be used to get the columns on which aggregate function
+ * and time series function is not applied
+ * @param columnName
+ * parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getNonAggNonTimeseriesChildColBasedByParent(String columnName) {
+ Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
+ .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
* Below method will be used to get the columns on which aggregate function is not applied
* @param columnName
* parent column name
@@ -74,7 +108,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
Iterator<ColumnSchema> iterator = columnSchemas.iterator();
while (iterator.hasNext()) {
ColumnSchema next = iterator.next();
- if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+ if ((null == next.getAggFunction() || next.getAggFunction().isEmpty())) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method will be used to get the columns on which aggregate function is not applied
+ *
+ * @param columnName parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getTimeseriesChildColBasedByParent(String columnName,
+ String timeseriesFunction) {
+ Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if (timeseriesFunction.equals(next.getTimeSeriesFunction())) {
return next;
}
}
@@ -126,6 +181,28 @@ public class AggregationDataMapSchema extends DataMapSchema {
}
/**
+ * Below method will be used to get the column schema based on parent column name
+ * @param columName
+ * parent column name
+ * @param timeseriesFunction
+ * timeseries function applied on column
+ * @return child column schema
+ */
+ public ColumnSchema getTimeseriesChildColByParent(String columName, String timeseriesFunction) {
+ List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+ for (ColumnSchema columnSchema : listOfColumns) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ columnSchema.getParentColumnTableRelations();
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1
+ && parentColumnTableRelations.get(0).getColumnName().equals(columName)
+ && timeseriesFunction.equalsIgnoreCase(columnSchema.getTimeSeriesFunction())) {
+ return columnSchema;
+ }
+ }
+ return null;
+ }
+
+ /**
* Below method is to check if parent column with matching aggregate function
* @param parentColumnName
* parent column name
@@ -175,6 +252,15 @@ public class AggregationDataMapSchema extends DataMapSchema {
private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
parentToNonAggChildMapping = new HashMap<>();
for (ColumnSchema column : listOfColumns) {
+ if (!isTimeseriesDataMap) {
+ isTimeseriesDataMap =
+ null != column.getTimeSeriesFunction() && !column.getTimeSeriesFunction().isEmpty();
+ if (isTimeseriesDataMap) {
+ this.ordinal =
+ TimeSeriesFunctionEnum.valueOf(column.getTimeSeriesFunction().toUpperCase())
+ .getOrdinal();
+ }
+ }
if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
fillMappingDetails(column, parentToNonAggChildMapping);
}
@@ -210,4 +296,45 @@ public class AggregationDataMapSchema extends DataMapSchema {
}
}
+ public boolean isTimeseriesDataMap() {
+ return isTimeseriesDataMap;
+ }
+
+ /**
+ * Below method is to support rollup during loading the data in pre aggregate table
+ * In case of timeseries year level table data loading can be done using month level table or any
+ * time series level below year level for example day,hour minute, second.
+ * @TODO need to handle for pre aggregate table without timeseries
+ *
+ * @param aggregationDataMapSchema
+ * @return whether aggregation data map can be selected or not
+ */
+ public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) {
+ List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+ for (ColumnSchema column : listOfColumns) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ column.getParentColumnTableRelations();
+ //@TODO handle scenario when aggregate datamap columns is derive from multiple column
+ // which is not supported currently
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+ if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+ if (null == aggregationDataMapSchema
+ .getAggChildColByParent(parentColumnTableRelations.get(0).getColumnName(),
+ column.getAggFunction())) {
+ return false;
+ }
+ } else {
+ if (null == aggregationDataMapSchema.getNonAggChildColBasedByParent(
+ parentColumnTableRelations.get(0).getColumnName())) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
index 8b87a1a..5347567 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -70,8 +70,8 @@ public class AggregateTableSelector {
AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
isMatch = true;
for (QueryColumn queryColumn : projectionColumn) {
- ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
- .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ ColumnSchema columnSchemaByParentName =
+ getColumnSchema(queryColumn, aggregationDataMapSchema);
if (null == columnSchemaByParentName) {
isMatch = false;
}
@@ -95,8 +95,8 @@ public class AggregateTableSelector {
isMatch = true;
for (QueryColumn queryColumn : filterColumns) {
AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
- ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
- .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ ColumnSchema columnSchemaByParentName =
+ getColumnSchema(queryColumn, aggregationDataMapSchema);
if (null == columnSchemaByParentName) {
isMatch = false;
}
@@ -132,4 +132,27 @@ public class AggregateTableSelector {
}
return selectedDataMapSchema;
}
+
+ /**
+ * Below method will be used to get column schema for projection and
+ * filter query column
+ *
+ * @param queryColumn query column
+ * @param aggregationDataMapSchema selected data map schema
+ * @return column schema
+ */
+ private ColumnSchema getColumnSchema(QueryColumn queryColumn,
+ AggregationDataMapSchema aggregationDataMapSchema) {
+ ColumnSchema columnSchemaByParentName = null;
+ if (!queryColumn.getTimeseriesFunction().isEmpty()) {
+ columnSchemaByParentName = aggregationDataMapSchema
+ .getTimeseriesChildColBasedByParent(queryColumn.getColumnSchema().getColumnName(),
+ queryColumn.getTimeseriesFunction());
+ } else {
+ columnSchemaByParentName = aggregationDataMapSchema
+ .getNonAggNonTimeseriesChildColBasedByParent(
+ queryColumn.getColumnSchema().getColumnName());
+ }
+ return columnSchemaByParentName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
index c889716..c91a703 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -44,12 +44,18 @@ public class QueryColumn {
*/
private boolean isFilterColumn;
+ /**
+ * timeseries udf applied on column
+ */
+ private String timeseriesFunction;
+
public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
- boolean isFilterColumn) {
+ boolean isFilterColumn, String timeseriesFunction) {
this.columnSchema = columnSchema;
this.changedDataType = changedDataType;
this.aggFunction = aggFunction;
this.isFilterColumn = isFilterColumn;
+ this.timeseriesFunction = timeseriesFunction;
}
public ColumnSchema getColumnSchema() {
@@ -68,6 +74,10 @@ public class QueryColumn {
return isFilterColumn;
}
+ public String getTimeseriesFunction() {
+ return timeseriesFunction;
+ }
+
@Override public boolean equals(Object o) {
if (this == o) {
return true;
@@ -82,12 +92,18 @@ public class QueryColumn {
if (!columnSchema.equals(that.columnSchema)) {
return false;
}
- return aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null;
+ if (!(aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null)) {
+ return false;
+ }
+ return timeseriesFunction != null ?
+ timeseriesFunction.equals(that.timeseriesFunction) :
+ that.timeseriesFunction == null;
}
@Override public int hashCode() {
int result = columnSchema.hashCode();
result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0);
+ result = 31 * result + (timeseriesFunction != null ? timeseriesFunction.hashCode() : 0);
result = 31 * result + (isFilterColumn ? 1 : 0);
return result;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
deleted file mode 100644
index 02ff753..0000000
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.preagg;
-
-/**
- * enum for timeseries function
- */
-public enum TimeSeriesFunction {
- SECOND("second"),
- MINUTE("minute"),
- HOUR("hour"),
- DAY("day"),
- MONTH("month"),
- YEAR("year");
-
- private String name;
-
- TimeSeriesFunction(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
new file mode 100644
index 0000000..5d0d2af
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+/**
+ * enum for timeseries function
+ */
+public enum TimeSeriesFunctionEnum {
+ SECOND("second", 0),
+ MINUTE("minute", 1),
+ HOUR("hour", 2),
+ DAY("day", 3),
+ MONTH("month", 4),
+ YEAR("year", 5);
+
+ /**
+ * name of the function
+ */
+ private String name;
+
+ /**
+ * ordinal for function
+ */
+ private int ordinal;
+
+ TimeSeriesFunctionEnum(String name, int ordinal) {
+ this.name = name;
+ this.ordinal = ordinal;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index 50cb052..3aa4190 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -66,8 +66,9 @@ public class TimeSeriesUDF {
Calendar calendar = calanderThreadLocal.get();
calendar.clear();
calendar.setTimeInMillis(data.getTime());
- TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function);
- switch (timeSeriesFunction) {
+ TimeSeriesFunctionEnum timeSeriesFunctionEnum =
+ TimeSeriesFunctionEnum.valueOf(function.toUpperCase());
+ switch (timeSeriesFunctionEnum) {
case SECOND:
calendar.set(Calendar.MILLISECOND, 0);
break;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ab85684..148098d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -67,7 +67,9 @@ import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -2297,5 +2299,35 @@ public final class CarbonUtil {
return dataAndIndexSize;
}
+ /**
+ * Utility function to check whether table has timseries datamap or not
+ * @param carbonTable
+ * @return timeseries data map present
+ */
+ public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) {
+ List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+ for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+ if (dataMapSchema instanceof AggregationDataMapSchema) {
+ return ((AggregationDataMapSchema) dataMapSchema).isTimeseriesDataMap();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Utility function to check whether table has aggregation datamap or not
+ * @param carbonTable
+ * @return timeseries data map present
+ */
+ public static boolean hasAggregationDataMap(CarbonTable carbonTable) {
+ List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
+ for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+ if (dataMapSchema instanceof AggregationDataMapSchema) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/resources/timeseriestest.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/timeseriestest.csv b/integration/spark-common-test/src/test/resources/timeseriestest.csv
new file mode 100644
index 0000000..1674ac9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/timeseriestest.csv
@@ -0,0 +1,7 @@
+mytime,name,age
+2016-2-23 01:01:30,vishal,10
+2016-2-23 01:01:40,kunal,20
+2016-2-23 01:01:50,shahid,30
+2016-2-23 01:02:30,kk,40
+2016-2-23 01:02:40,rahul,50
+2016-2-23 01:02:50,ravi,50
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index dc117a5..d84ec3b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -16,7 +16,6 @@
*/
package org.apache.carbondata.integration.spark.testsuite.preaggregate
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index b60e487..5cbcb26 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -1,7 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.carbondata.integration.spark.testsuite.timeseries
import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, Ignore}
class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
new file mode 100644
index 0000000..217edea
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+@Ignore
+class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+ }
+
+ test("test Year level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_year"),
+ Seq(Row(Timestamp.valueOf("2016-01-01 00:00:00.0"),200)))
+ }
+
+ test("test month level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_month"),
+ Seq(Row(Timestamp.valueOf("2016-02-01 00:00:00.0"),200)))
+ }
+
+ test("test day level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_day"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 00:00:00.0"),200)))
+ }
+
+ test("test hour level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_hour"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:00:00.0"),200)))
+ }
+
+ test("test minute level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_minute"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:01:00.0"),60),
+ Row(Timestamp.valueOf("2016-02-23 01:02:00.0"),140)))
+ }
+
+ test("test second level timeseries data validation") {
+ checkAnswer( sql("select * from maintable_agg0_second"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10),
+ Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20),
+ Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30),
+ Row(Timestamp.valueOf("2016-02-23 01:02:30.0"),40),
+ Row(Timestamp.valueOf("2016-02-23 01:02:40.0"),50),
+ Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
+ }
+
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
new file mode 100644
index 0000000..0990f87
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.timeseries
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+
+class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime")
+ }
+ test("test PreAggregate table selection 1") {
+ val df = sql("select dataTime from mainTable group by dataTime")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable")
+ }
+
+ test("test PreAggregate table selection 2") {
+ val df = sql("select timeseries(dataTime,'hour') from mainTable group by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
+ }
+
+ test("test PreAggregate table selection 3") {
+ val df = sql("select timeseries(dataTime,'milli') from mainTable group by timeseries(dataTime,'milli')")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable")
+ }
+
+ test("test PreAggregate table selection 4") {
+ val df = sql("select timeseries(dataTime,'year') from mainTable group by timeseries(dataTime,'year')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_year")
+ }
+
+ test("test PreAggregate table selection 5") {
+ val df = sql("select timeseries(dataTime,'day') from mainTable group by timeseries(dataTime,'day')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_day")
+ }
+
+ test("test PreAggregate table selection 6") {
+ val df = sql("select timeseries(dataTime,'month') from mainTable group by timeseries(dataTime,'month')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_month")
+ }
+
+ test("test PreAggregate table selection 7") {
+ val df = sql("select timeseries(dataTime,'minute') from mainTable group by timeseries(dataTime,'minute')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_minute")
+ }
+
+ test("test PreAggregate table selection 8") {
+ val df = sql("select timeseries(dataTime,'second') from mainTable group by timeseries(dataTime,'second')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_second")
+ }
+
+ test("test PreAggregate table selection 9") {
+ val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+ }
+
+ test("test PreAggregate table selection 10") {
+ val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+ }
+
+ test("test PreAggregate table selection 11") {
+ val df = sql("select timeseries(dataTime,'hour'),sum(age) from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+ }
+
+ test("test PreAggregate table selection 12") {
+ val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
+ }
+
+ test("test PreAggregate table selection 13") {
+ val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' and name='vishal' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+ preAggTableValidator(df.queryExecution.analyzed,"maintable")
+ }
+
+ def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+ var isValidPlan = false
+ plan.transform {
+ // first check if any preaTable1 scala function is applied it is present is in plan
+ // then call is from create preaTable1regate table class so no need to transform the query plan
+ case ca:CarbonRelation =>
+ if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
+ if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+ isValidPlan = true
+ }
+ }
+ ca
+ case logicalRelation:LogicalRelation =>
+ if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+ isValidPlan = true
+ }
+ }
+ logicalRelation
+ }
+ if(!isValidPlan) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ }
+
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 7cc3d11..5f78397 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.util.CarbonUtil
case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
@@ -41,7 +42,11 @@ object CarbonSparkUtil {
f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!f.getDataType.isComplexType)
}
- CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
+ CarbonMetaData(dimensionsAttr,
+ measureAttr,
+ carbonTable,
+ DictionaryMap(dictionary.toMap),
+ CarbonUtil.hasAggregationDataMap(carbonTable))
}
def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d68bc41..6317177 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,12 +17,11 @@
package org.apache.spark.sql
-import java.sql.Timestamp
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction}
import org.apache.spark.sql.hive._
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -65,8 +64,7 @@ class CarbonEnv {
sparkSession.udf.register("preAggLoad", () => "")
// added for handling timeseries function like hour, minute, day , month , year
- sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) =>
- TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction))
+ sparkSession.udf.register("timeseries", new TimeSeriesFunction)
synchronized {
if (!initialized) {
// update carbon session parameters , preserve thread parameters
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
index 8e157fd..c1f9e8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.types.DataType
@@ -84,4 +84,18 @@ object CarbonExpressions {
}
}
}
+
+ /**
+ * unapply method of Scala UDF
+ */
+ object CarbonScalaUDF {
+ def unapply(expression: Expression): Option[(ScalaUDF)] = {
+ expression match {
+ case a: ScalaUDF =>
+ Some(a)
+ case _ =>
+ None
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 4315e05..747e447 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -18,19 +18,15 @@
package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
+import scala.collection.mutable
-import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
-import org.apache.spark.sql.execution.command.AlterTableModel
-import org.apache.spark.sql.CarbonSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand}
+import org.apache.spark.sql.execution.command.AlterTableModel
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
object LoadPostAggregateListener extends OperationEventListener {
/**
@@ -43,20 +39,47 @@ object LoadPostAggregateListener extends OperationEventListener {
val sparkSession = loadEvent.sparkSession
val carbonLoadModel = loadEvent.carbonLoadModel
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- if (table.hasDataMapSchema) {
- for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
+ if (CarbonUtil.hasAggregationDataMap(table)) {
+ // getting all the aggergate datamap schema
+ val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
+ .filter(_.isInstanceOf[AggregationDataMapSchema])
+ .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+ // sorting the datamap for timeseries rollup
+ val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
+ val parentTableName = table.getTableName
+ val databasename = table.getDatabaseName
+ val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+ for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
val childTableName = dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+ val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
+ dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
+ } else {
+ // for timeseries rollup policy
+ val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
+ dataMapSchema)
+ // if non of the rollup data map is selected hit the maintable and prepare query
+ if (tableSelectedForRollup.isEmpty) {
+ PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
+ parentTableName,
+ databasename)
+ } else {
+ // otherwise hit the select rollup datamap schema
+ PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
+ tableSelectedForRollup.get,
+ databasename)
+ }
+ }
PreAggregateUtil.startDataLoadForDataMap(
table,
TableIdentifier(childTableName, Some(childDatabaseName)),
- dataMapSchema.getProperties.get("CHILD_SELECT QUERY"),
+ childSelectQuery,
carbonLoadModel.getSegmentId,
validateSegments = false,
sparkSession)
+ }
}
}
- }
}
/**
@@ -74,7 +97,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
val carbonTable = compactionEvent.carbonTable
val compactionType = compactionEvent.carbonMergerMapping.campactionType
val sparkSession = compactionEvent.sparkSession
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
val childRelationIdentifier = dataMapSchema.getRelationIdentifier
val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
@@ -120,7 +143,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
val carbonTable = dataTypeChangePreListener.carbonTable
val alterTableDataTypeChangeModel = dataTypeChangePreListener.alterTableDataTypeChangeModel
val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
dataMapSchemas.asScala.foreach { dataMapSchema =>
val childColumns = dataMapSchema.getChildSchema.getListOfColumns
@@ -170,7 +193,7 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
val carbonTable = deleteSegmentByDatePreEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table. " +
"Drop pre-aggregation table to continue")
@@ -194,7 +217,7 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table")
}
@@ -219,7 +242,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
val carbonTable = dataTypeChangePreListener.carbonTable
val alterTableDropColumnModel = dataTypeChangePreListener.alterTableDropColumnModel
val columnsToBeDropped = alterTableDropColumnModel.columns
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
dataMapSchemas.asScala.foreach { dataMapSchema =>
val parentColumnNames = dataMapSchema.getChildSchema.getListOfColumns.asScala
@@ -257,7 +280,7 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
throw new UnsupportedOperationException(
"Rename operation for pre-aggregate table is not supported.")
}
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Rename operation is not supported for table with pre-aggregate tables")
}
@@ -275,7 +298,7 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Update operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
@@ -299,7 +322,7 @@ object DeletePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasDataMapSchema) {
+ if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
throw new UnsupportedOperationException(
"Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 851b851..5ad5308 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -544,8 +544,10 @@ object PreAggregateUtil {
def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = {
val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
- tableSchema.getListOfColumns.asScala.foreach {
- a => if (a.getAggFunction.nonEmpty) {
+ val columns = tableSchema.getListOfColumns.asScala
+ .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+ columns.foreach { a =>
+ if (a.getAggFunction.nonEmpty) {
aggregateColumns += s"${a.getAggFunction match {
case "count" => "sum"
case _ => a.getAggFunction}}(${a.getColumnName})"
@@ -558,4 +560,101 @@ object PreAggregateUtil {
groupingExpressions.mkString(",") }"
}
+ /**
+ * Below method will be used to get the select query when rollup policy is
+ * applied in case of timeseries table
+ * @param tableSchema
+ * main data map schema
+ * @param selectedDataMapSchema
+ * selected data map schema for rollup
+ * @return select query based on rolloup
+ */
+ def createTimeseriesSelectQueryForRollup(
+ tableSchema: TableSchema,
+ selectedDataMapSchema: AggregationDataMapSchema,
+ databaseName: String): String = {
+ val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+ val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+ val columns = tableSchema.getListOfColumns.asScala
+ .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+ columns.foreach { a =>
+ if (a.getAggFunction.nonEmpty) {
+ aggregateColumns += s"${a.getAggFunction match {
+ case "count" => "sum"
+ case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent(
+ a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})"
+ } else if (a.getTimeSeriesFunction.nonEmpty) {
+ groupingExpressions += s"timeseries(${
+ selectedDataMapSchema
+ .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+ get(0).getColumnName).getColumnName
+ } , '${ a.getTimeSeriesFunction }')"
+ } else {
+ groupingExpressions += selectedDataMapSchema
+ .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+ get(0).getColumnName).getColumnName
+ }
+ }
+ s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
+ } from $databaseName.${selectedDataMapSchema.getChildSchema.getTableName } " +
+ s"group by ${ groupingExpressions.mkString(",") }"
+ }
+
+ /**
+ * Below method will be used to creating select query for timeseries
+ * for lowest level for aggergation like second level, in that case it will
+ * hit the maintable
+ * @param tableSchema
+ * data map schema
+ * @param parentTableName
+ * parent schema
+ * @return select query for loading
+ */
+ def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
+ parentTableName: String,
+ databaseName: String): String = {
+ val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+ val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+ val columns = tableSchema.getListOfColumns.asScala
+ .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+ columns.foreach {a =>
+ if (a.getAggFunction.nonEmpty) {
+ aggregateColumns +=
+ s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })"
+ } else if (a.getTimeSeriesFunction.nonEmpty) {
+ groupingExpressions +=
+ s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${
+ a.getTimeSeriesFunction}')"
+ } else {
+ groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName
+ }
+ }
+ s"select ${ groupingExpressions.mkString(",") },${
+ aggregateColumns.mkString(",")
+ } from $databaseName.${ parentTableName } group by ${ groupingExpressions.mkString(",") }"
+
+ }
+ /**
+ * Below method will be used to select rollup table in case of
+ * timeseries data map loading
+ * @param list
+ * list of timeseries datamap
+ * @param dataMapSchema
+ * datamap schema
+ * @return select table name
+ */
+ def getRollupDataMapNameForTimeSeries(
+ list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema],
+ dataMapSchema: AggregationDataMapSchema): Option[AggregationDataMapSchema] = {
+ if (list.isEmpty) {
+ None
+ } else {
+ val rollupDataMapSchema = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+ list.foreach{f =>
+ if (dataMapSchema.canSelectForRollup(f)) {
+ rollupDataMapSchema += f
+ } }
+ rollupDataMapSchema.lastOption
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
new file mode 100644
index 0000000..ad9ace7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.timeseries
+
+import java.sql.Timestamp
+
+import org.apache.carbondata.core.preagg.TimeSeriesUDF
+
+/**
+ * Time series udf class
+ */
+
+class TimeSeriesFunction extends Function2[Timestamp, String, Timestamp] with Serializable{
+
+ override def apply(v1: Timestamp,
+ v2: String): Timestamp = {
+ TimeSeriesUDF.INSTANCE.applyUDF(v1, v2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
index 9d4ce56..6a4ef56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.command.timeseries
-import java.sql.Timestamp
-
import org.apache.spark.sql.execution.command.{DataMapField, Field}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -142,18 +140,5 @@ object TimeSeriesUtil {
obj._2.aggregateFunction.isEmpty)
isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
}
-
- /**
- * UDF for timeseries
- *
- * @param timestamp
- * timestamp
- * @param timeSeriesFunctionType
- * time series function
- * @return updated timestamp based on function
- */
- def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = {
- TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType)
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index abc58ff..f7a1eed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.util.CarbonReflectionUtils
@@ -61,7 +61,8 @@ case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
case class CarbonMetaData(dims: Seq[String],
msrs: Seq[String],
carbonTable: CarbonTable,
- dictionaryMap: DictionaryMap)
+ dictionaryMap: DictionaryMap,
+ hasAggregateDataMapSchema: Boolean)
case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
def get(name: String): Option[Boolean] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 09e66de..4227dcb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -22,22 +22,22 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.CarbonExpressions.MatchCast
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -69,6 +69,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
* 5. Order By Query rules.
* 5.1 Update project list based on updated aggregate expression
* 5.2 Update sort order attributes based on pre aggregate table
+ * 6. timeseries function
+ * 6.1 validate maintable has timeseries datamap
+ * 6.2 timeseries function is valid function or not
*
* @param sparkSession
* spark session
@@ -115,8 +118,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// only carbon query plan is supported checking whether logical relation is
// is for carbon
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
// if it is valid plan then extract the query columns
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -136,8 +139,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// only carbon query plan is supported checking whether logical relation is
// is for carbon
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
// if it is valid plan then extract the query columns
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -148,11 +151,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
// getting the columns from filter expression
if(isValidPlan) {
- filterExp.transform {
- case attr: AttributeReference =>
- list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
- attr
- }
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
}
carbonTable
@@ -162,8 +161,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// only carbon query plan is supported checking whether logical relation is
// is for carbon
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
// if it is valid plan then extract the query columns
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
@@ -180,8 +179,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
aggregateExp,
@@ -201,8 +200,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
aggregateExp,
@@ -213,11 +212,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
if (isValidPlan) {
list ++
extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
- filterExp.transform {
- case attr: AttributeReference =>
- list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
- attr
- }
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
}
carbonTable
// case for handling aggregation with order by when only projection column exits
@@ -227,8 +222,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
aggregateExp,
@@ -248,8 +243,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
aggregateExp,
@@ -261,11 +256,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
carbonTable = carbonTable,
tableName = tableName)
- filterExp.transform {
- case attr: AttributeReference =>
- list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
- attr
- }
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
}
carbonTable
case _ =>
@@ -321,6 +312,65 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
/**
+ * Below method will be used to extract the query columns from
+ * filter expression
+ * @param filterExp
+ * filter expression
+ * @param set
+ * query column list
+ * @param carbonTable
+ * parent table
+ * @param tableName
+ * table name
+ * @return isvalid filter expression for aggregate
+ */
+ def extractQueryColumnFromFilterExp(filterExp: Expression,
+ set: scala.collection.mutable.HashSet[QueryColumn],
+ carbonTable: CarbonTable, tableName: String): Boolean = {
+ // map to maintain attribute reference present in the filter to timeseries function
+ // if applied this is added to avoid duplicate column
+ val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String]
+ var isValidPlan = true
+ filterExp.transform {
+ case attr: AttributeReference =>
+ if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+ mapOfColumnSeriesFun.put(attr, null)
+ }
+ attr
+ case udf@CarbonScalaUDF(_) =>
+ // for handling timeseries function
+ if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+ "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
+ CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
+ mapOfColumnSeriesFun.put(udf.children(0).asInstanceOf[AttributeReference],
+ udf.children(1).asInstanceOf[Literal].value.toString)
+ } else {
+ // for any other scala udf
+ udf.transform {
+ case attr: AttributeReference =>
+ if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+ mapOfColumnSeriesFun.put(attr, null)
+ }
+ attr
+ }
+ }
+ udf
+ }
+ mapOfColumnSeriesFun.foreach { f =>
+ if (f._2 == null) {
+ set +=
+ getQueryColumn(f._1.name, carbonTable, tableName, isFilterColumn = true)
+ } else {
+ set += getQueryColumn(f._1.name,
+ carbonTable,
+ carbonTable.getTableName,
+ isFilterColumn = true,
+ timeseriesFunction = f._2)
+ }
+ }
+ isValidPlan
+ }
+ /**
* Below method will be used to extract columns from order by expression
* @param projectList
* project list from plan
@@ -383,13 +433,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
attributeReference: AttributeReference,
attributes: Seq[AttributeReference],
aggFunction: String = "",
- canBeNull: Boolean = false): AttributeReference = {
+ canBeNull: Boolean = false,
+ timeseriesFunction: String = ""): AttributeReference = {
val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
- val columnSchema = if (aggFunction.isEmpty) {
+ val columnSchema = if (aggFunction.isEmpty && timeseriesFunction.isEmpty) {
aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
- } else {
+ } else if (!aggFunction.isEmpty) {
aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase,
- aggFunction.toLowerCase)
+ aggFunction)
+ } else {
+ aggregationDataMapSchema
+ .getTimeseriesChildColByParent(attributeReference.name.toLowerCase,
+ timeseriesFunction)
}
// here column schema cannot be null, if it is null then aggregate table selection
// logic has some problem
@@ -427,6 +482,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* 5. Order by plan rules.
* 5.1 Update project list based on updated aggregate expression
* 5.2 Update sort order attributes based on pre aggregate table
+ * 6. timeseries function
+ * 6.1 validate parent table has timeseries datamap
+ * 6.2 timeseries function is valid function or not
*
* @param logicalPlan
* parent logical plan
@@ -444,7 +502,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// case for aggregation query
case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(grExp,
aggExp,
@@ -461,7 +520,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggExp,
Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
getUpdatedExpressions(grExp,
aggExp,
@@ -477,7 +537,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// case for aggregation query
case Aggregate(grExp, aggExp, l: LogicalRelation)
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(grExp,
aggExp,
@@ -497,7 +558,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(groupingExp,
aggregateExp,
@@ -520,7 +582,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
getUpdatedExpressions(groupingExp,
aggregateExp,
@@ -544,8 +607,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- .hasDataMapSchema =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(groupingExp,
aggregateExp,
@@ -566,7 +629,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExp,
Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
getUpdatedExpressions(groupingExp,
aggregateExp,
@@ -615,6 +679,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
alias.qualifier,
alias.isGenerated)
alias
+ case alias@Alias(exp: Expression, name) =>
+ updatedProjectList += AttributeReference(name, exp.dataType, exp.nullable)(alias.exprId,
+ alias.qualifier,
+ alias.isGenerated)
+ alias
}
}
// getting the updated sort order
@@ -724,6 +793,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
Alias(aggExp,
name)(NamedExpression.newExprId,
alias.qualifier).asInstanceOf[NamedExpression]
+ case alias@Alias(expression: Expression, name) =>
+ val updatedExp =
+ if (expression.isInstanceOf[ScalaUDF] &&
+ expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+ "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) {
+ expression.asInstanceOf[ScalaUDF].transform {
+ case attr: AttributeReference =>
+ val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+ attr,
+ attributes,
+ timeseriesFunction =
+ expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value.toString)
+ childAttributeReference
+ }
+ } else {
+ expression.transform{
+ case attr: AttributeReference =>
+ val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+ attr,
+ attributes)
+ childAttributeReference
+ }
+ }
+ Alias(updatedExp, name)(NamedExpression.newExprId,
+ alias.qualifier).asInstanceOf[NamedExpression]
}
// transformaing the logical relation
val newChild = child.transform {
@@ -763,11 +857,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// in case of alias we need to match with alias name and when alias is not present
// we need to compare with attribute reference name
case alias@Alias(attr: AttributeReference, name)
- if attr.name.equals(sortOrderAttr.name) || name.equals(sortOrderAttr.name) =>
+ if attr.name.equalsIgnoreCase(sortOrderAttr.name) ||
+ name.equalsIgnoreCase(sortOrderAttr.name) =>
AttributeReference(name,
- attr.dataType,
- attr.nullable,
- attr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
+ sortOrderAttr.dataType,
+ sortOrderAttr.nullable,
+ sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
+ case alias@Alias(_: Expression, name)
+ if name.equalsIgnoreCase(sortOrderAttr.name) =>
+ AttributeReference(name,
+ sortOrderAttr.dataType,
+ sortOrderAttr.nullable,
+ sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
}
// any case it will match the condition, so no need to check whether updated expression is empty
// or not
@@ -933,7 +1034,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case attr: AttributeReference =>
set += getQueryColumn(attr.name,
carbonTable,
- tableName);
+ tableName)
case Alias(attr: AttributeReference, _) =>
set += getQueryColumn(attr.name,
carbonTable,
@@ -950,6 +1051,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
} else {
return false
}
+ case Alias(expression: Expression, _) =>
+ if (expression.isInstanceOf[ScalaUDF] &&
+ expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
+ "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
+ CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
+ set += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0)
+ .asInstanceOf[AttributeReference].name,
+ carbonTable,
+ tableName,
+ timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal]
+ .value.toString)
+ } else {
+ expression.transform {
+ case attr: AttributeReference =>
+ set += getQueryColumn(attr.name,
+ carbonTable,
+ tableName)
+ attr
+ }
+ }
}
true
}
@@ -1048,6 +1169,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
}
}
+
+
/**
* Below method will be used to get the query column object which
* will have details of the column and its property
@@ -1074,7 +1197,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggFunction: String = "",
dataType: String = "",
isChangedDataType: Boolean = false,
- isFilterColumn: Boolean = false): QueryColumn = {
+ isFilterColumn: Boolean = false,
+ timeseriesFunction: String = ""): QueryColumn = {
val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
if(null == columnSchema) {
null
@@ -1083,11 +1207,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
new QueryColumn(columnSchema.getColumnSchema,
columnSchema.getDataType.getName,
aggFunction.toLowerCase,
- isFilterColumn)
+ isFilterColumn,
+ timeseriesFunction.toLowerCase)
} else {
new QueryColumn(columnSchema.getColumnSchema,
- CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
- aggFunction.toLowerCase, isFilterColumn)
+ CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
+ aggFunction.toLowerCase,
+ isFilterColumn,
+ timeseriesFunction.toLowerCase)
}
}
}