You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:16:42 UTC
[2/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table
selection and Query Plan changes
[CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes
This closes #1464
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dda2573a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dda2573a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dda2573a
Branch: refs/heads/pre-aggregate
Commit: dda2573a1160ba0d97047693c4b094bc6da06d95
Parents: c1eefee
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Oct 30 12:44:32 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:46:24 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 2 +
.../ThriftWrapperSchemaConverterImpl.java | 7 +-
.../schema/table/AggregationDataMapSchema.java | 212 +++++
.../core/metadata/schema/table/CarbonTable.java | 16 +-
.../metadata/schema/table/DataMapSchema.java | 19 +-
.../schema/table/DataMapSchemaFactory.java | 38 +
.../core/metadata/schema/table/TableInfo.java | 7 +-
.../core/metadata/schema/table/TableSchema.java | 5 +-
.../core/preagg/AggregateTableSelector.java | 135 +++
.../carbondata/core/preagg/QueryColumn.java | 70 ++
.../carbondata/core/preagg/QueryPlan.java | 59 ++
.../TestPreAggregateTableSelection.scala | 175 ++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 7 +
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 3 +
.../CreatePreAggregateTableCommand.scala | 12 +-
.../preaaggregate/PreAggregateListeners.scala | 24 +-
.../preaaggregate/PreAggregateUtil.scala | 184 ++--
.../spark/sql/hive/CarbonAnalysisRules.scala | 101 +--
.../sql/hive/CarbonPreAggregateRules.scala | 829 +++++++++++++++++++
.../spark/sql/hive/CarbonSessionState.scala | 6 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 8 +
.../spark/sql/parser/CarbonSparkSqlParser.scala | 3 +-
23 files changed, 1709 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cb0b5c3..f17a9e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1411,6 +1411,8 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
+ public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index b914e06..fef2e0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -628,10 +629,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
@Override public DataMapSchema fromExternalToWrapperDataMapSchema(
org.apache.carbondata.format.DataMapSchema thriftDataMapSchema) {
- DataMapSchema childSchema =
- new DataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
+ DataMapSchema childSchema = DataMapSchemaFactory.INSTANCE
+ .getDataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
childSchema.setProperties(thriftDataMapSchema.getProperties());
- if (thriftDataMapSchema.getRelationIdentifire() != null) {
+ if (null != thriftDataMapSchema.getRelationIdentifire()) {
RelationIdentifier relationIdentifier =
new RelationIdentifier(thriftDataMapSchema.getRelationIdentifire().getDatabaseName(),
thriftDataMapSchema.getRelationIdentifire().getTableName(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
new file mode 100644
index 0000000..87c07f4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+
+/**
+ * data map schema class for pre aggregation
+ */
+public class AggregationDataMapSchema extends DataMapSchema {
+
+ /**
+ * map of parent column name to set of child column column without
+ * aggregation function
+ */
+ private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping;
+
+ /**
+ * map of parent column name to set of child columns column with
+ * aggregation function
+ */
+ private Map<String, Set<ColumnSchema>> parentToAggChildMapping;
+
+ /**
+ * map of parent column name to set of aggregation function applied in
+ * in parent column
+ */
+ private Map<String, Set<String>> parentColumnToAggregationsMapping;
+
+ public AggregationDataMapSchema(String dataMapName, String className) {
+ super(dataMapName, className);
+ }
+
+ public void setChildSchema(TableSchema childSchema) {
+ super.setChildSchema(childSchema);
+ List<ColumnSchema> listOfColumns = getChildSchema().getListOfColumns();
+ fillNonAggFunctionColumns(listOfColumns);
+ fillAggFunctionColumns(listOfColumns);
+ fillParentNameToAggregationMapping(listOfColumns);
+ }
+
+ /**
+ * Below method will be used to get the columns on which aggregate function is not applied
+ * @param columnName
+ * parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
+ Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method will be used to get the column schema based on parent column name
+ * @param columName
+ * parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getChildColByParentColName(String columName) {
+ List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+ for (ColumnSchema columnSchema : listOfColumns) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ columnSchema.getParentColumnTableRelations();
+ if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
+ return columnSchema;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method will be used to get the child column schema based on parent name and aggregate
+ * function applied on column
+ * @param columnName
+ * parent column name
+ * @param aggFunction
+ * aggregate function applied
+ * @return child column schema
+ */
+ public ColumnSchema getAggChildColByParent(String columnName,
+ String aggFunction) {
+ Set<ColumnSchema> columnSchemas = parentToAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if (null != next.getAggFunction() && next.getAggFunction().equalsIgnoreCase(aggFunction)) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method is to check if parent column with matching aggregate function
+ * @param parentColumnName
+ * parent column name
+ * @param aggFunction
+ * aggregate function
+ * @return is matching
+ */
+ public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction) {
+ Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName);
+ if (null != aggFunctions && aggFunctions.contains(aggFunction)) {
+ return true;
+ }
+ return false;
+ }
+
+
+ /**
+ * Method to prepare mapping of parent to list of aggregation function applied on that column
+ * @param listOfColumns
+ * child column schema list
+ */
+ private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) {
+ parentColumnToAggregationsMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ column.getParentColumnTableRelations();
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+ String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+ Set<String> aggFunctions = parentColumnToAggregationsMapping.get(columnName);
+ if (null == aggFunctions) {
+ aggFunctions = new HashSet<>();
+ parentColumnToAggregationsMapping.put(columnName, aggFunctions);
+ }
+ aggFunctions.add(column.getAggFunction());
+ }
+ }
+ }
+ }
+
+ /**
+ * Below method will be used prepare mapping between parent column to non aggregation function
+ * columns
+ * @param listOfColumns
+ * list of child columns
+ */
+ private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+ parentToNonAggChildMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
+ fillMappingDetails(column, parentToNonAggChildMapping);
+ }
+ }
+ }
+
+ private void fillMappingDetails(ColumnSchema column,
+ Map<String, Set<ColumnSchema>> map) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ column.getParentColumnTableRelations();
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+ String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+ Set<ColumnSchema> columnSchemas = map.get(columnName);
+ if (null == columnSchemas) {
+ columnSchemas = new HashSet<>();
+ map.put(columnName, columnSchemas);
+ }
+ columnSchemas.add(column);
+ }
+ }
+
+ /**
+ * Below method will be used to fill parent to list of aggregation column mapping
+ * @param listOfColumns
+ * list of child columns
+ */
+ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+ parentToAggChildMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+ fillMappingDetails(column, parentToAggChildMapping);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ca0952d..0fd9fbf 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -126,6 +126,8 @@ public class CarbonTable implements Serializable {
private int dimensionOrdinalMax;
+ private boolean hasDataMapSchema;
+
private CarbonTable() {
this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -158,6 +160,8 @@ public class CarbonTable implements Serializable {
table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
tableInfo.getFactTable().getPartitionInfo());
}
+ table.hasDataMapSchema =
+ null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
return table;
}
@@ -702,13 +706,13 @@ public class CarbonTable implements Serializable {
this.dimensionOrdinalMax = dimensionOrdinalMax;
}
- public boolean isPreAggregateTable() {
- return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
- .getParentRelationIdentifiers().isEmpty();
+
+ public boolean hasDataMapSchema() {
+ return hasDataMapSchema;
}
- public boolean hasPreAggregateTables() {
- return tableInfo.getDataMapSchemaList() != null && !tableInfo
- .getDataMapSchemaList().isEmpty();
+ public boolean isChildDataMap() {
+ return null != tableInfo.getParentRelationIdentifiers()
+ && !tableInfo.getParentRelationIdentifiers().isEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index e0632d9..5a9017b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -30,20 +30,20 @@ public class DataMapSchema implements Serializable, Writable {
private static final long serialVersionUID = 6577149126264181553L;
- private String dataMapName;
+ protected String dataMapName;
private String className;
- private RelationIdentifier relationIdentifier;
+ protected RelationIdentifier relationIdentifier;
/**
* child table schema
*/
- private TableSchema childSchema;
+ protected TableSchema childSchema;
/**
* relation properties
*/
- private Map<String, String> properties;
+ protected Map<String, String> properties;
public DataMapSchema() {
}
@@ -69,6 +69,10 @@ public class DataMapSchema implements Serializable, Writable {
return properties;
}
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
this.relationIdentifier = relationIdentifier;
}
@@ -81,10 +85,6 @@ public class DataMapSchema implements Serializable, Writable {
this.properties = properties;
}
- public String getDataMapName() {
- return dataMapName;
- }
-
@Override public void write(DataOutput out) throws IOException {
out.writeUTF(dataMapName);
out.writeUTF(className);
@@ -114,7 +114,7 @@ public class DataMapSchema implements Serializable, Writable {
this.className = in.readUTF();
boolean isRelationIdnentifierExists = in.readBoolean();
if (isRelationIdnentifierExists) {
- this.relationIdentifier = new RelationIdentifier();
+ this.relationIdentifier = new RelationIdentifier(null, null, null);
this.relationIdentifier.readFields(in);
}
boolean isChildSchemaExists = in.readBoolean();
@@ -130,6 +130,5 @@ public class DataMapSchema implements Serializable, Writable {
String value = in.readUTF();
this.properties.put(key, value);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
new file mode 100644
index 0000000..5729959
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
+
+public class DataMapSchemaFactory {
+ public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
+
+ /**
+ * Below class will be used to get data map schema object
+ * based on class name
+ * @param className
+ * @return data map schema
+ */
+ public DataMapSchema getDataMapSchema(String dataMapName, String className) {
+ switch (className) {
+ case AGGREGATIONDATAMAPSCHEMA:
+ return new AggregationDataMapSchema(dataMapName, className);
+ default:
+ return new DataMapSchema(dataMapName, className);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 1d9e2ec..65878bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -295,7 +295,12 @@ public class TableInfo implements Serializable, Writable {
for (int i = 0; i < numberOfChildTable; i++) {
DataMapSchema childSchema = new DataMapSchema();
childSchema.readFields(in);
- dataMapSchemaList.add(childSchema);
+ DataMapSchema dataMapSchema = DataMapSchemaFactory.INSTANCE
+ .getDataMapSchema(childSchema.getDataMapName(), childSchema.getClassName());
+ dataMapSchema.setChildSchema(childSchema.getChildSchema());
+ dataMapSchema.setRelationIdentifier(childSchema.getRelationIdentifier());
+ dataMapSchema.setProperties(childSchema.getProperties());
+ dataMapSchemaList.add(dataMapSchema);
}
}
boolean isParentTableRelationIndentifierExists = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 714e0d8..03848d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -263,9 +263,10 @@ public class TableSchema implements Serializable, Writable {
Map<String, String> properties = new HashMap<>();
properties.put("CHILD_SELECT QUERY", queryString);
properties.put("QUERYTYPE", queryType);
- DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
- dataMapSchema.setChildSchema(this);
+ DataMapSchema dataMapSchema =
+ new DataMapSchema(dataMapName, className);
dataMapSchema.setProperties(properties);
+ dataMapSchema.setChildSchema(this);
dataMapSchema.setRelationIdentifier(relationIdentifier);
return dataMapSchema;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
new file mode 100644
index 0000000..8b87a1a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Below class will be used to select the aggregate table based
+ * query plan. Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ */
+public class AggregateTableSelector {
+
+ /**
+ * current query plan
+ */
+ private QueryPlan queryPlan;
+
+ /**
+ * parent table
+ */
+ private CarbonTable parentTable;
+
+ public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
+ this.queryPlan = queryPlan;
+ this.parentTable = parentTable;
+ }
+
+ /**
+ * Below method will be used to select pre aggregate tables based on query plan
+ * Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ *
+ * @return selected pre aggregate table schema
+ */
+ public List<DataMapSchema> selectPreAggDataMapSchema() {
+ List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn();
+ List<QueryColumn> aggColumns = queryPlan.getAggregationColumns();
+ List<QueryColumn> filterColumns = queryPlan.getFilterColumns();
+ List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList();
+ List<DataMapSchema> selectedDataMapSchema = new ArrayList<>();
+ boolean isMatch;
+ // match projection columns
+ if (null != projectionColumn && !projectionColumn.isEmpty()) {
+ for (DataMapSchema dmSchema : dataMapSchemaList) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ isMatch = true;
+ for (QueryColumn queryColumn : projectionColumn) {
+ ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+ .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ if (null == columnSchemaByParentName) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ // if projection column is present but selected table list size is zero then
+ if (selectedDataMapSchema.size() == 0) {
+ return selectedDataMapSchema;
+ }
+ }
+
+ // match filter columns
+ if (null != filterColumns && !filterColumns.isEmpty()) {
+ List<DataMapSchema> dmSchemaToIterate =
+ selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+ selectedDataMapSchema = new ArrayList<>();
+ for (DataMapSchema dmSchema : dmSchemaToIterate) {
+ isMatch = true;
+ for (QueryColumn queryColumn : filterColumns) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+ .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ if (null == columnSchemaByParentName) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ // if filter column is present and selection size is zero then return
+ if (selectedDataMapSchema.size() == 0) {
+ return selectedDataMapSchema;
+ }
+ }
+ // match aggregation columns
+ if (null != aggColumns && !aggColumns.isEmpty()) {
+ List<DataMapSchema> dmSchemaToIterate =
+ selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+ selectedDataMapSchema = new ArrayList<>();
+ for (DataMapSchema dmSchema : dmSchemaToIterate) {
+ isMatch = true;
+ for (QueryColumn queryColumn : aggColumns) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ if (!aggregationDataMapSchema
+ .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(),
+ queryColumn.getAggFunction())) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ }
+ return selectedDataMapSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
new file mode 100644
index 0000000..a62d556
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * column present in query
+ */
+public class QueryColumn {
+
+ /**
+ * parent column schema
+ */
+ private ColumnSchema columnSchema;
+
+ /**
+ * to store the change data type in case of cast
+ */
+ private String changedDataType;
+
+ /**
+ * aggregation function applied
+ */
+ private String aggFunction;
+
+ /**
+ * is filter column
+ */
+ private boolean isFilterColumn;
+
+ public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
+ boolean isFilterColumn) {
+ this.columnSchema = columnSchema;
+ this.changedDataType = changedDataType;
+ this.aggFunction = aggFunction;
+ this.isFilterColumn = isFilterColumn;
+ }
+
+ public ColumnSchema getColumnSchema() {
+ return columnSchema;
+ }
+
+ public String getChangedDataType() {
+ return changedDataType;
+ }
+
+ public String getAggFunction() {
+ return aggFunction;
+ }
+
+ public boolean isFilterColumn() {
+ return isFilterColumn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
new file mode 100644
index 0000000..21a34fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.List;
+
+/**
+ * class to maintain the query plan to select the data map tables
+ */
+public class QueryPlan {
+
+ /**
+ * List of projection columns
+ */
+ private List<QueryColumn> projectionColumn;
+
+ /**
+ * list of aggregation columns
+ */
+ private List<QueryColumn> aggregationColumns;
+
+ /**
+ * list of filter columns
+ */
+ private List<QueryColumn> filterColumns;
+
+ public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns,
+ List<QueryColumn> filterColumns) {
+ this.projectionColumn = projectionColumn;
+ this.aggregationColumns = aggregationColumns;
+ this.filterColumns = filterColumns;
+ }
+
+ public List<QueryColumn> getProjectionColumn() {
+ return projectionColumn;
+ }
+
+ public List<QueryColumn> getAggregationColumns() {
+ return aggregationColumns;
+ }
+
+ public List<QueryColumn> getFilterColumns() {
+ return filterColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
new file mode 100644
index 0000000..6b435c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaTable1regate
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("drop table if exists agg0")
+ sql("drop table if exists agg1")
+ sql("drop table if exists agg2")
+ sql("drop table if exists agg3")
+ sql("drop table if exists agg4")
+ sql("drop table if exists agg5")
+ sql("drop table if exists agg6")
+ sql("drop table if exists agg7")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+ sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(id) from mainTable group by name")
+ sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,count(id) from mainTable group by name")
+ sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(age),count(id) from mainTable group by name")
+ sql("create datamap agg5 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
+ sql("create datamap agg6 on table mainTable using 'preaggregate' as select name,min(age) from mainTable group by name")
+ sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age) from mainTable group by name")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+
+
+ test("test PreAggregate table selection 1") {
+ val df = sql("select name from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+ test("test PreAggregate table selection 2") {
+ val df = sql("select name from mainTable where name in (select name from mainTable) group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 3") {
+ val df = sql("select name from mainTable where name in (select name from mainTable group by name) group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 4") {
+ val df = sql("select name from mainTable where name in('vishal') group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+ test("test PreAggregate table selection 5") {
+ val df = sql("select name, sum(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 6") {
+ val df = sql("select sum(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 7") {
+ val df = sql("select sum(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
+ }
+
+ test("test PreAggregate table selection 8") {
+ val df = sql("select count(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ }
+
+ test("test PreAggregate table selection 9") {
+ val df = sql("select sum(age), count(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+ }
+
+ test("test PreAggregate table selection 10") {
+ val df = sql("select avg(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+ }
+
+ test("test PreAggregate table selection 11") {
+ val df = sql("select max(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+ }
+
+ test("test PreAggregate table selection 12") {
+ val df = sql("select min(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+ }
+
+ test("test PreAggregate table selection 13") {
+ val df = sql("select name, sum(age) from mainTable where city = 'Bangalore' group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 14") {
+ val df = sql("select sum(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 15") {
+ val df = sql("select avg(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+ }
+
+ test("test PreAggregate table selection 16") {
+ val df = sql("select max(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+ }
+
+ test("test PreAggregate table selection 17") {
+ val df = sql("select min(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+ }
+
+ test("test PreAggregate table selection 18") {
+ val df = sql("select count(id) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ }
+
+ def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+ var isValidPlan = false
+ plan.transform {
+ // first check if any preaTable1 scala function is applied it is present is in plan
+ // then call is from create preaTable1regate table class so no need to transform the query plan
+ case logicalRelation:LogicalRelation =>
+ if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+ isValidPlan = true
+ }
+ }
+ logicalRelation
+ }
+ if(!isValidPlan) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ }
+
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("drop table if exists agg0")
+ sql("drop table if exists agg1")
+ sql("drop table if exists agg2")
+ sql("drop table if exists agg3")
+ sql("drop table if exists agg4")
+ sql("drop table if exists agg5")
+ sql("drop table if exists agg6")
+ sql("drop table if exists agg7")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 42447da..e83d96a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -173,6 +173,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val DATAMAP = carbonKeyWord("DATAMAP")
protected val ON = carbonKeyWord("ON")
protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
+ protected val SELECT = carbonKeyWord("SELECT")
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
@@ -989,6 +990,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
}
+ lazy val addPreAgg: Parser[String] =
+ SELECT ~> restInput <~ opt(";") ^^ {
+ case query =>
+ "select preAGG() as preAgg, " + query
+ }
+
protected lazy val primitiveFieldType: Parser[Field] =
primitiveTypes ^^ {
case e1 =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9899be1..28dcbf2 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -463,7 +463,7 @@ object CarbonDataRDDFactory {
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
- if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+ if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a37b55b..b69ef2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -52,6 +52,9 @@ class CarbonEnv {
def init(sparkSession: SparkSession): Unit = {
sparkSession.udf.register("getTupleId", () => "")
+ // added for handling preaggregate table creation. when user will fire create ddl for
+ // create table we are adding a udf so no need to apply PreAggregate rules.
+ sparkSession.udf.register("preAgg", () => "")
if (!initialized) {
// update carbon session parameters , preserve thread parameters
val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ebf6273..3a78968 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
/**
* Below command class will be used to create pre-aggregate table
* and updating the parent table about the child table information
@@ -47,9 +49,10 @@ case class CreatePreAggregateTableCommand(
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val df = sparkSession.sql(queryString)
- val fieldRelationMap = PreAggregateUtil
- .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+ val df = sparkSession.sql(updatedQuery)
+ val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+ df.logicalPlan, queryString)
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
dmproperties.foreach(t => tableProperties.put(t._1, t._2))
@@ -87,7 +90,8 @@ case class CreatePreAggregateTableCommand(
val tableInfo = relation.tableMeta.carbonTable.getTableInfo
// child schema object which will be updated on parent table about the
val childSchema = tableInfo.getFactTable.buildChildSchema(
- dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+ tableInfo.getDatabaseName, queryString, "AGGREGATION")
dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
// updating the parent table about child table
PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 8271e57..7a66e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -105,7 +105,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
}
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot change data type for columns in " +
s"pre-aggreagate table ${
carbonTable.getDatabaseName
@@ -126,12 +126,12 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
val carbonTable = deleteSegmentByDatePreEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table. " +
"Drop pre-aggregation table to continue")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
@@ -150,11 +150,11 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
@@ -190,7 +190,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
s"pre-aggregate table ${ dataMapSchema.getRelationIdentifier.toString}")
}
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
}
@@ -209,11 +209,11 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
operationContext: OperationContext): Unit = {
val renameTablePostListener = event.asInstanceOf[AlterTableRenamePreEvent]
val carbonTable = renameTablePostListener.carbonTable
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Rename operation for pre-aggregate table is not supported.")
}
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Rename operation is not supported for table with pre-aggregate tables")
}
@@ -231,12 +231,12 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Update operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Update operation is not supported for pre-aggregate table")
}
@@ -255,12 +255,12 @@ object DeletePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete operation is not supported for pre-aggregate table")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index b926705..62e7623 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,13 +16,14 @@
*/
package org.apache.spark.sql.execution.command.preaaggregate
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
@@ -51,9 +52,14 @@ object PreAggregateUtil {
def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
plan match {
- case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
- if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+ case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+ carbonRelation.metaData.carbonTable
+ case Aggregate(_, _, logicalRelation: LogicalRelation)
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+ carbonRelation.metaData.carbonTable
case _ => throw new MalformedCarbonCommandException("table does not exist")
}
}
@@ -67,54 +73,86 @@ object PreAggregateUtil {
* @param selectStmt
* @return list of fields
*/
- def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+ def validateActualSelectPlanAndGetAttributes(plan: LogicalPlan,
selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
- val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
plan match {
- case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
- if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
- .metaData.carbonTable
- val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getTableName
- val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getDatabaseName
- val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getTableId
- if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+ case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) =>
+ getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+ case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) =>
+ getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+ }
+ }
+
+ /**
+ * Below method will be used to get the fields from expressions
+ * @param groupByExp
+ * grouping expression
+ * @param aggExp
+ * aggregate expression
+ * @param logicalRelation
+ * logical relation
+ * @param selectStmt
+ * select statement
+ * @return fields from expressions
+ */
+ def getFieldsFromPlan(groupByExp: Seq[Expression],
+ aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String):
+ scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+ val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ throw new MalformedCarbonCommandException("Un-supported table")
+ }
+ val carbonTable = logicalRelation.relation.
+ asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+ .metaData.carbonTable
+ val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableName
+ val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getDatabaseName
+ val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableId
+ if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ "Pre Aggregation is not supported on Pre-Aggregated Table")
+ }
+ groupByExp.map {
+ case attr: AttributeReference =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case _ =>
+ throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${
+ selectStmt } ")
+ }
+ aggExp.map {
+ case Alias(attr: AggregateExpression, _) =>
+ if (attr.isDistinct) {
throw new MalformedCarbonCommandException(
- "Pre Aggregation is not supported on Pre-Aggregated Table")
- }
- aExp.map {
- case Alias(attr: AggregateExpression, _) =>
- if (attr.isDistinct) {
- throw new MalformedCarbonCommandException(
- "Distinct is not supported On Pre Aggregation")
- }
- fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
- attr.aggregateFunction,
- parentTableName,
- parentDatabaseName,
- parentTableId))
- case attr: AttributeReference =>
- fieldToDataMapFieldMap += getField(attr.name,
- attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- case Alias(attr: AttributeReference, _) =>
- fieldToDataMapFieldMap += getField(attr.name,
- attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- case _ =>
- throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
- selectStmt } ")
+ "Distinct is not supported On Pre Aggregation")
}
- Some(carbonTable)
+ fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+ attr.aggregateFunction,
+ parentTableName,
+ parentDatabaseName,
+ parentTableId)
+ case attr: AttributeReference =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case Alias(attr: AttributeReference, _) =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
case _ =>
- throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+ throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+ selectStmt } ")
}
fieldToDataMapFieldMap
}
@@ -347,30 +385,6 @@ object PreAggregateUtil {
}
/**
- * This method will split schema string into multiple parts of configured size and
- * registers the parts as keys in tableProperties which will be read by spark to prepare
- * Carbon Table fields
- *
- * @param sparkConf
- * @param schemaJsonString
- * @return
- */
- private def prepareSchemaJson(sparkConf: SparkConf,
- schemaJsonString: String): String = {
- val threshold = sparkConf
- .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
- CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- var schemaParts: Seq[String] = Seq.empty
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
- parts.zipWithIndex.foreach { case (part, index) =>
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
- }
- schemaParts.mkString(",")
- }
-
- /**
* Validates that the table exists and acquires meta lock on it.
*
* @param dbName
@@ -453,4 +467,30 @@ object PreAggregateUtil {
def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
}
+
+ /**
+ * Below method will be used to update logical plan
+ * this is required for creating pre aggregate tables,
+ * so @CarbonPreAggregateRules will not be applied during creation
+ * @param logicalPlan
+ * actual logical plan
+ * @return updated plan
+ */
+ def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ case _@Project(projectList, child) =>
+ val buffer = new ArrayBuffer[NamedExpression]()
+ buffer ++= projectList
+ buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+ Seq.empty, isDistinct = false), "preAgg")())
+ Project(buffer, child)
+ case Aggregate(groupByExp, aggExp, l: UnresolvedRelation) =>
+ val buffer = new ArrayBuffer[NamedExpression]()
+ buffer ++= aggExp
+ buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+ Seq.empty, isDistinct = false), "preAgg")())
+ Aggregate(groupByExp, buffer, l)
+ }
+ updatedPlan
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index ba7e1eb..7bd0fad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,111 +20,12 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
+import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
-
- case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
- if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
- }
- }
-
- def castChildOutput(p: InsertIntoTable,
- relation: CarbonDatasourceHadoopRelation,
- child: LogicalPlan)
- : LogicalPlan = {
- if (relation.carbonRelation.output.size > CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
- sys
- .error("Maximum supported column by carbon is:" + CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS
- )
- }
- val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
- .getParentRelationIdentifiers.isEmpty
- // transform logical plan if the load is for aggregate table.
- val childPlan = if (isAggregateTable) {
- transformAggregatePlan(child)
- } else {
- child
- }
- if (childPlan.output.size >= relation.carbonRelation.output.size) {
- val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
- columnWithIndex._1 match {
- case attr: Alias =>
- Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
- case attr: Attribute =>
- Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
- case attr => attr
- }
- }
- val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
- p.child
- } else {
- Project(newChildOutput, childPlan)
- }
- InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
- } else {
- sys.error("Cannot insert into target table because column number are different")
- }
- }
-
- /**
- * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
- *
- * @param logicalPlan
- * @return
- */
- private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
- logicalPlan transform {
- case aggregate@Aggregate(_, aExp, _) =>
- val newExpressions = aExp flatMap {
- case alias@Alias(attrExpression: AggregateExpression, _) =>
- attrExpression.aggregateFunction flatMap {
- case Average(attr: AttributeReference) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId),
- attr.name)(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId), attr.name)())
- case Average(Cast(attr: AttributeReference, _)) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId),
- attr.name)(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId), attr.name)())
- case _: DeclarativeAggregate => Seq(alias)
- case _ => Nil
- }
- case namedExpr: NamedExpression => Seq(namedExpr)
- }
- aggregate.copy(aggregateExpressions = newExpressions)
- case plan: LogicalPlan => plan
- }
- }
-}
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {