You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 05:15:02 UTC
[carbondata] branch master updated: [CARBONDATA-3357] Support
TableProperties from single parent table and restrict
alter/delete/partition on mv
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2a28dba [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv
2a28dba is described below
commit 2a28dba04236ce976984d9cbc398eb8fa517d6f5
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Apr 24 01:04:21 2019 +0530
[CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv
Inherit Table Properties from main table to mv datamap table, if datamap has single parent table, else use
default table properties.
Restrict Alter/Delete/Partition operations on MV
This closes #3184
---
.../core/datamap/DataMapStoreManager.java | 27 +-
.../carbondata/core/datamap/DataMapUtil.java | 1 +
.../core/metadata/schema/table/CarbonTable.java | 17 --
.../core/metadata/schema/table/DataMapSchema.java | 14 +
.../carbondata/mv/datamap/MVDataMapProvider.scala | 19 +-
.../apache/carbondata/mv/datamap/MVHelper.scala | 110 ++++++--
.../org/apache/carbondata/mv/datamap/MVUtil.scala | 287 +++++++++++++++++++++
.../mv/rewrite/MVCountAndCaseTestCase.scala | 2 -
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 29 +--
.../mv/rewrite/MVIncrementalLoadingTestcase.scala | 1 -
.../mv/rewrite/MVMultiJoinTestCase.scala | 8 +-
.../carbondata/mv/rewrite/MVTpchTestCase.scala | 10 +-
.../mv/rewrite/TestAllOperationsOnMV.scala | 255 ++++++++++++++++++
.../mv/rewrite/matching/TestSQLBatch.scala | 4 +-
.../preaggregate/TestPreAggregateLoad.scala | 2 +-
.../TestTimeSeriesUnsupportedSuite.scala | 8 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 9 +-
.../command/datamap/CarbonDropDataMapCommand.scala | 9 +
.../management/CarbonCleanFilesCommand.scala | 3 +-
.../execution/command/mv/DataMapListeners.scala | 146 ++++++++++-
.../CarbonAlterTableDropHivePartitionCommand.scala | 7 +-
.../preaaggregate/PreAggregateListeners.scala | 6 +-
.../preaaggregate/PreAggregateTableHelper.scala | 102 +-------
.../schema/CarbonAlterTableRenameCommand.scala | 7 +-
.../spark/sql/execution/strategy/DDLStrategy.scala | 4 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 10 +-
.../scala/org/apache/spark/util/DataMapUtil.scala | 160 ++++++++++++
27 files changed, 1054 insertions(+), 203 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 81b1fb2..89402c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -281,19 +281,22 @@ public final class DataMapStoreManager {
dataMapCatalogs = new ConcurrentHashMap<>();
List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
for (DataMapSchema schema : dataMapSchemas) {
- DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
- if (dataMapCatalog == null) {
- dataMapCatalog = dataMapProvider.createDataMapCatalog();
- if (null == dataMapCatalog) {
- throw new RuntimeException("Internal Error.");
+ if (schema.getProviderName()
+ .equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) {
+ DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
+ if (dataMapCatalog == null) {
+ dataMapCatalog = dataMapProvider.createDataMapCatalog();
+ if (null == dataMapCatalog) {
+ throw new RuntimeException("Internal Error.");
+ }
+ dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
+ }
+ try {
+ dataMapCatalog.registerSchema(schema);
+ } catch (Exception e) {
+ // Ignore the schema
+ LOGGER.error("Error while registering schema", e);
}
- dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
- }
- try {
- dataMapCatalog.registerSchema(schema);
- } catch (Exception e) {
- // Ignore the schema
- LOGGER.error("Error while registering schema", e);
}
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 0a604fb..e20f19a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -270,4 +270,5 @@ public class DataMapUtil {
}
return segmentList;
}
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index afb5fd3..4f4475d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -63,7 +63,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
import com.google.common.collect.Lists;
@@ -1321,22 +1320,6 @@ public class CarbonTable implements Serializable, Writable {
}
/**
- * Return true if MV datamap present in the specified table
- * @param carbonTable
- * @return timeseries data map present
- */
- public static boolean hasMVDataMap(CarbonTable carbonTable) throws IOException {
- List<DataMapSchema> dataMapSchemaList = DataMapStoreManager.getInstance()
- .getDataMapSchemasOfTable(carbonTable);
- for (DataMapSchema dataMapSchema : dataMapSchemaList) {
- if (dataMapSchema.getProviderName().equalsIgnoreCase(MV.toString())) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Return all inverted index columns in this table
*/
public List<ColumnSchema> getInvertedIndexColumns() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index a48b03c..b927ce0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
@@ -80,6 +81,11 @@ public class DataMapSchema implements Serializable, Writable {
*/
protected TableSchema childSchema;
+ /**
+ * main table column list mapped to datamap table
+ */
+ private Map<String, Set<String>> mainTableColumnList;
+
public DataMapSchema(String dataMapName, String providerName) {
this.dataMapName = dataMapName;
this.providerName = providerName;
@@ -250,4 +256,12 @@ public class DataMapSchema implements Serializable, Writable {
@Override public int hashCode() {
return Objects.hash(dataMapName);
}
+
+ public Map<String, Set<String>> getMainTableColumnList() {
+ return mainTableColumnList;
+ }
+
+ public void setMainTableColumnList(Map<String, Set<String>> mainTableColumnList) {
+ this.mainTableColumnList = mainTableColumnList;
+ }
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 26c4cb6..90b7dbc 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -58,7 +58,21 @@ class MVDataMapProvider(
"select statement is mandatory")
}
MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true)
- DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+ try {
+ DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+ if (dataMapSchema.isLazy) {
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ }
+ } catch {
+ case exception: Exception =>
+ dropTableCommand = new CarbonDropTableCommand(true,
+ new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.getRelationIdentifier.getTableName,
+ true)
+ dropTableCommand.run(sparkSession)
+ DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+ throw exception
+ }
}
override def initData(): Unit = {
@@ -141,7 +155,8 @@ class MVDataMapProvider(
dataFrame = Some(queryPlan),
updateModel = None,
tableInfoOp = None,
- internalOptions = Map("mergedSegmentName" -> newLoadName),
+ internalOptions = Map("mergedSegmentName" -> newLoadName,
+ CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
partition = Map.empty)
try {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 810449c..4bcaa1d 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -29,17 +29,17 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
@@ -58,7 +58,12 @@ object MVHelper {
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
- validateMVQuery(sparkSession, logicalPlan)
+ val selectTables = getTables(logicalPlan)
+ if (selectTables.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ s"Non-Carbon table does not support creating MV datamap")
+ }
+ val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
val fields = logicalPlan.output.map { attr =>
val name = updateColumnName(attr)
@@ -81,29 +86,41 @@ object MVHelper {
}
}
val tableProperties = mutable.Map[String, String]()
- dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
- val selectTables = getTables(logicalPlan)
val parentTables = new util.ArrayList[String]()
+ val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
selectTables.foreach { selectTable =>
val mainCarbonTable = try {
Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
selectTable.identifier.table)(sparkSession))
} catch {
// Exception handling if it's not a CarbonTable
- case ex : Exception => None
+ case ex: Exception =>
+ throw new MalformedCarbonCommandException(
+ s"Non-Carbon table does not support creating MV datamap")
}
parentTables.add(mainCarbonTable.get.getTableName)
if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
throw new MalformedCarbonCommandException(
s"Streaming table does not support creating MV datamap")
}
+ parentTablesList.add(mainCarbonTable.get)
}
tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
- // TODO inherit the table properties like sort order, sort scope and block size from parent
- // tables to mv datamap table
+ val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
+ logicalPlan, queryString, sparkSession)
+ // If dataMap is mapped to single main table, then inherit table properties from main table,
+ // else, will use default table properties. If DMProperties contains table properties, then
+ // table properties of datamap table will be updated
+ if (parentTablesList.size() == 1) {
+ DataMapUtil
+ .inheritTablePropertiesFromMainTable(parentTablesList.get(0),
+ fields,
+ fieldRelationMap,
+ tableProperties)
+ }
+ dmProperties.foreach(t => tableProperties.put(t._1, t._2))
// TODO Use a proper DB
val tableIdentifier =
TableIdentifier(dataMapSchema.getDataMapName + "_table",
@@ -129,7 +146,28 @@ object MVHelper {
CarbonCreateTableCommand(TableNewProcessor(tableModel),
tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
- dataMapSchema.setCtasQuery(queryString)
+ // Map list of main table columns mapped to datamap table and add to dataMapSchema
+ val mainTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
+ val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
+ while (mainTableFieldIterator.hasNext) {
+ val value = mainTableFieldIterator.next()
+ value.columnTableRelationList.foreach {
+ columnTableRelation =>
+ columnTableRelation.foreach {
+ mainTable =>
+ if (null == mainTableToColumnsMap.get(mainTable.parentTableName)) {
+ val columns = new util.HashSet[String]()
+ columns.add(mainTable.parentColumnName)
+ mainTableToColumnsMap.put(mainTable.parentTableName, columns)
+ } else {
+ mainTableToColumnsMap.get(mainTable.parentTableName)
+ .add(mainTable.parentColumnName)
+ }
+ }
+ }
+ }
+ dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
+ dataMapSchema.setCtasQuery(updatedQueryWithDb)
dataMapSchema
.setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
tableIdentifier.table,
@@ -143,14 +181,44 @@ object MVHelper {
dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
- DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
- if (dataMapSchema.isLazy) {
- DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ try {
+ DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+ } catch {
+ case ex: Exception =>
+ val dropTableCommand = CarbonDropTableCommand(true,
+ new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+ dataMapSchema.getRelationIdentifier.getTableName,
+ true)
+ dropTableCommand.run(sparkSession)
+ throw ex
+ }
+ }
+
+ private def isValidSelect(isValidExp: Boolean,
+ s: Select): Boolean = {
+ // Make sure all predicates are present in projections.
+ var predicateList: Seq[AttributeReference] = Seq.empty
+ s.predicateList.map { f =>
+ f.children.collect {
+ case p: AttributeReference =>
+ predicateList = predicateList.+:(p)
+ }
+ }
+ if (predicateList.nonEmpty) {
+ predicateList.forall { p =>
+ s.outputList.exists {
+ case a: Alias =>
+ a.semanticEquals(p) || a.child.semanticEquals(p)
+ case other => other.semanticEquals(p)
+ }
+ }
+ } else {
+ isValidExp
}
}
private def validateMVQuery(sparkSession: SparkSession,
- logicalPlan: LogicalPlan): Unit = {
+ logicalPlan: LogicalPlan): String = {
val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
@@ -169,17 +237,24 @@ object MVHelper {
val isValid = modularPlan match {
case g: GroupBy =>
// Make sure all predicates are present in projections.
- g.predicateList.forall{p =>
+ val isValidExp = g.predicateList.forall{p =>
g.outputList.exists{
case a: Alias =>
a.semanticEquals(p) || a.child.semanticEquals(p)
case other => other.semanticEquals(p)
}
}
+ g.child match {
+ case s: Select =>
+ isValidSelect(isValidExp, s)
+ }
+ case s: Select =>
+ isValidSelect(true, s)
case _ => true
}
if (!isValid) {
- throw new UnsupportedOperationException("Group by columns must be present in project columns")
+ throw new UnsupportedOperationException(
+ "Group by/Filter columns must be present in project columns")
}
if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
throw new UnsupportedOperationException("MV with same query present")
@@ -196,6 +271,7 @@ object MVHelper {
if (!expressionValid) {
throw new UnsupportedOperationException("MV doesn't support Coalesce")
}
+ modularPlan.asCompactSQL
}
def updateColumnName(attr: Attribute): String = {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
new file mode 100644
index 0000000..6852695
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.datamap
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for mv datamap
+ */
+object MVUtil {
+
+ /**
+ * Below method will be used to validate and get the required fields from select plan
+ */
+ def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan,
+ selectStmt: String,
+ sparkSession: SparkSession): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+ plan match {
+ case Project(projectList, child: Sort) =>
+ getFieldsFromProject(projectList, plan, child)
+ case Project(projectList, _) =>
+ getFieldsFromProject(projectList, plan)
+ case Aggregate(groupByExp, aggExp, _) =>
+ getFieldsFromAggregate(groupByExp, aggExp, plan)
+ }
+ }
+
+ def getFieldsFromProject(projectList: Seq[NamedExpression],
+ plan: LogicalPlan, sort: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+ var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ sort.transformDown {
+ case agg@Aggregate(groupByExp, aggExp, _) =>
+ fieldToDataMapFieldMap ++== getFieldsFromAggregate(groupByExp, aggExp, plan)
+ agg
+ }
+ fieldToDataMapFieldMap ++== getFieldsFromProject(projectList, plan)
+ fieldToDataMapFieldMap
+ }
+
+ def getFieldsFromProject(projectList: Seq[NamedExpression],
+ plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+ var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ val logicalRelation =
+ plan.collect {
+ case lr: LogicalRelation =>
+ lr
+ }
+ projectList.map {
+ case attr: AttributeReference =>
+ val carbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+ val relation = getColumnRelation(attr.name,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(attr.name,
+ attr.dataType,
+ attr.qualifier,
+ "",
+ arrayBuffer,
+ carbonTable.getTableName)
+ }
+ case Alias(attr: AttributeReference, name) =>
+ val carbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+ val relation = getColumnRelation(attr.name,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(name, attr.dataType, attr.qualifier, "", arrayBuffer, "")
+ }
+ }
+ fieldToDataMapFieldMap
+ }
+
+ def getFieldsFromAggregate(groupByExp: Seq[Expression],
+ aggExp: Seq[NamedExpression],
+ plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+ var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ val logicalRelation =
+ plan.collect {
+ case lr: LogicalRelation =>
+ lr
+ }
+ aggExp.map { agg =>
+ var aggregateType: String = ""
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+ agg.collect {
+ case Alias(attr: AggregateExpression, name) =>
+ if (attr.aggregateFunction.isInstanceOf[Count]) {
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(name,
+ attr.aggregateFunction.dataType,
+ None,
+ attr.aggregateFunction.nodeName,
+ arrayBuffer,
+ "")
+ } else {
+ aggregateType = attr.aggregateFunction.nodeName
+ }
+ case Alias(_, name) =>
+ // In case of arithmetic expressions like sum(a)+sum(b)
+ aggregateType = "arithmetic"
+ }
+ agg.collect {
+ case attr: AttributeReference =>
+ val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val relation = getColumnRelation(attr.name,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ if (null != relation) {
+ arrayBuffer += relation
+ }
+ if (aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+ val tableName = carbonTable.getTableName
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(agg.name,
+ agg.dataType,
+ attr.qualifier,
+ aggregateType,
+ arrayBuffer,
+ tableName)
+ }
+ }
+ }
+ if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(agg.name,
+ agg.dataType,
+ agg.qualifier,
+ aggregateType,
+ arrayBuffer,
+ "")
+ }
+ }
+ groupByExp map { grp =>
+ grp.collect {
+ case attr: AttributeReference =>
+ val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+ if (null != carbonTable) {
+ val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
+ ArrayBuffer[ColumnTableRelation]()
+ arrayBuffer += getColumnRelation(attr.name,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ fieldToDataMapFieldMap +=
+ getFieldToDataMapFields(attr.name,
+ attr.dataType,
+ attr.qualifier,
+ "",
+ arrayBuffer,
+ carbonTable.getTableName)
+ }
+ }
+ }
+ fieldToDataMapFieldMap
+ }
+
+ /**
+ * Below method will be used to get the column relation with the parent column
+ */
+ def getColumnRelation(parentColumnName: String,
+ parentTableId: String,
+ parentTableName: String,
+ parentDatabaseName: String,
+ carbonTable: CarbonTable): ColumnTableRelation = {
+ val parentColumn = carbonTable.getColumnByName(parentTableName, parentColumnName)
+ var columnTableRelation: ColumnTableRelation = null
+ if (null != parentColumn) {
+ val parentColumnId = parentColumn.getColumnId
+ columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName,
+ parentColumnId = parentColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ columnTableRelation
+ } else {
+ columnTableRelation
+ }
+ }
+
+ /**
+ * This method is used to get carbon table for corresponding attribute reference
+ * from logical relation
+ */
+ private def getCarbonTable(logicalRelation: Seq[LogicalRelation],
+ attr: AttributeReference) = {
+ val relations = logicalRelation
+ .filter(lr => lr.output
+ .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
+ attrRef.exprId.equals(attr.exprId)))
+ if (relations.nonEmpty) {
+ relations
+ .head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+ .metaData.carbonTable
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Below method will be used to get the fields object for mv table
+ */
+ private def getFieldToDataMapFields(name: String,
+ dataType: DataType,
+ qualifier: Option[String],
+ aggregateType: String,
+ columnTableRelationList: ArrayBuffer[ColumnTableRelation],
+ parenTableName: String) = {
+ var actualColumnName =
+ name.replace("(", "_")
+ .replace(")", "")
+ .replace(" ", "_")
+ .replace("=", "")
+ .replace(",", "")
+ if (qualifier.isDefined) {
+ actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
+ .getOrElse(actualColumnName)
+ }
+ if (qualifier.isEmpty) {
+ if (aggregateType.isEmpty && !parenTableName.isEmpty) {
+ actualColumnName = parenTableName + "_" + actualColumnName
+ }
+ }
+ val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+ val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList))
+ if (dataType.typeName.startsWith("decimal")) {
+ val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ precision = precision,
+ scale = scale,
+ rawSchema = rawSchema), dataMapField)
+ } else {
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ rawSchema = rawSchema), dataMapField)
+ }
+ }
+}
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
index 567d6a9..af4afb6 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -47,8 +47,6 @@ class MVCountAndCaseTestCase extends QueryTest with BeforeAndAfterAll{
| FROM data_table
| GROUP BY STARTTIME,LAYER4ID""".stripMargin)
- sql("rebuild datamap data_table_mv")
-
var frame = sql(s"""SELECT MT.`3600` AS `3600`,
| MT.`2250410101` AS `2250410101`,
| count(1) over() as countNum,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 5016bbe..4f5423e 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -180,7 +180,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") {
- sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap9 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1 where deptname='cloud'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -189,7 +189,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") {
- sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap10 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap10"))
@@ -198,7 +198,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") {
- sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+ sql("create datamap datamap11 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'")
val frame = sql("select empname,designation from fact_table1 where designation='SA'")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap11"))
@@ -327,7 +327,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on query") {
sql("drop datamap if exists datamap22")
- sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
+ sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation,t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -341,7 +341,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on query and datamap") {
sql("drop datamap if exists datamap23")
- sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
+ sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -354,7 +354,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on datamap and no filter on query") {
sql("drop datamap if exists datamap24")
- sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
+ sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -365,7 +365,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with multiple join") {
sql("drop datamap if exists datamap25")
- sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)")
+ sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation, t2.empname, t3.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)")
val frame = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -379,7 +379,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
ignore("test create datamap with simple join on datamap and multi join on query") {
- sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
+ sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 " +
"t3 where t1.empname = t2.empname and t1.empname=t3.empname")
@@ -391,7 +391,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
test("test create datamap with join with group by") {
- sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap27 using 'mv' as select t1.empname , t2.designation, sum(t1.utilization), sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -404,7 +404,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection") {
sql("drop datamap if exists datamap28")
- sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname group by t2.designation")
@@ -417,7 +417,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection with filter") {
sql("drop datamap if exists datamap29")
- sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation")
@@ -430,7 +430,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with join with group by with filter") {
sql("drop datamap if exists datamap30")
- sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+ sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation")
@@ -612,7 +612,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with left join on query and equi join on mv with group by with filter") {
sql("drop datamap if exists datamap45")
- sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
+ sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
// During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
@@ -649,7 +649,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
test("jira carbondata-2528-2") {
-
sql("drop datamap if exists MV_order")
sql("drop datamap if exists MV_desc_order")
sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname")
@@ -910,7 +909,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
val exception_tb_mv2: Exception = intercept[Exception] {
sql("create datamap dm_stream_test2 using 'mv' as select t1.empname as c1, t2.designation, " +
- "t2.empname as c2 from (fact_table1 t1 inner join fact_streaming_table2 t2 " +
+ "t2.empname as c2,t3.empname from (fact_table1 t1 inner join fact_streaming_table2 t2 " +
"on (t1.empname = t2.empname)) inner join fact_table_parquet t3 " +
"on (t1.empname = t3.empname)")
}
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index bbd7b4c..2e64055 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -182,7 +182,6 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
sql(s"rebuild datamap datamap1")
loadDataToFactTable("test_table")
sql(s"rebuild datamap datamap1")
- checkExistence(sql("show segments for table datamap1_table"), false, "0.1")
sql("alter datamap datamap1 compact 'major'")
val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
CarbonCommonConstants.DATABASE_DEFAULT_NAME,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
index bfd621d..4e3eb10 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
@@ -48,8 +48,9 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll {
|inner join areas as c on c.pid=p.aid
|where p.title = 'hebei'
""".stripMargin
- sql("create datamap table_mv using 'mv' as " + mvSQL)
- sql("rebuild datamap table_mv")
+ sql("create datamap table_mv using 'mv' as " +
+ "select p.title,c.title,c.pid,p.aid from areas as p inner join areas as c on " +
+ "c.pid=p.aid where p.title = 'hebei'")
val frame = sql(mvSQL)
assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan")))
@@ -70,8 +71,7 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll {
| left join dim_table dim_other on sdr.name = dim_other.name
| group by sdr.name,dim.age,dim_other.height
""".stripMargin
- sql("create datamap table_mv using 'mv' as " + mvSQL)
- sql("rebuild datamap table_mv")
+ sql("create datamap table_mv using 'mv' as " + "select sdr.name,sum(sdr.score),dim.age,dim_other.height,count(dim.name) as c1, count(dim_other.name) as c2 from sdr_table sdr left join dim_table dim on sdr.name = dim.name left join dim_table dim_other on sdr.name = dim_other.name group by sdr.name,dim.age,dim_other.height")
val frame = sql(mvSQL)
assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170)))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index 5788a23..b5d874a 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -110,7 +110,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch3 with no filters on mv") {
sql(s"drop datamap if exists datamap5")
- sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
+ sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate, c_custkey as c1, o_custkey as c2,o_orderkey as o1 from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate,c_custkey,o_custkey, o_orderkey ")
val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -150,7 +150,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch5 with no filters on mv") {
sql(s"drop datamap if exists datamap8")
- sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey group by n_name,o_orderdate,r_name")
+ sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue, sum(c_custkey), sum(o_custkey), sum(l_orderkey),sum(o_orderkey), sum(l_suppkey), sum(s_suppkey), sum(c_nationkey), sum(s_nationkey), sum(n_nationkey), sum(n_regionkey), sum(r_regionkey) from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey an [...]
val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap8"))
@@ -160,7 +160,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch6") {
sql(s"drop datamap if exists datamap9")
- sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
+ sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue, count(l_shipdate), sum(l_discount),sum(l_quantity) from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -182,7 +182,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch7 part of query1") {
sql(s"drop datamap if exists datamap11")
- sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+ sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount, s_suppkey,l_suppkey, o_orderkey,l_orderkey, c_custkey, o_custkey, s_nationkey, n1.n_nationkey, c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap11"))
@@ -192,7 +192,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch7 part of query2 (core issue)") {
sql(s"drop datamap if exists datamap12")
- sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+ sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount,s_suppkey, l_suppkey,o_orderkey,l_orderkey, c_custkey,o_custkey,s_nationkey, n1.n_nationkey,c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('19 [...]
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap12"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
new file mode 100644
index 0000000..3978bd1
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -0,0 +1,255 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
+/**
+ * Test Class for MV Datamap to verify all scenerios
+ */
+class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
+
+ override def beforeEach(): Unit = {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop table IF EXISTS testtable")
+ sql("create table testtable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table testtable select 'abc',21,2000")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ checkResult()
+ }
+
+ private def checkResult(): Unit = {
+ checkAnswer(sql("select name,sum(price) from maintable group by name"),
+ sql("select name,sum(price) from maintable group by name"))
+ }
+
+ override def afterEach(): Unit = {
+ sql("drop table IF EXISTS maintable")
+ sql("drop table IF EXISTS testtable")
+ sql("drop table if exists par_table")
+ }
+
+ test("test alter add column on maintable") {
+ sql("alter table maintable add columns(d int)")
+ sql("insert into table maintable select 'abc',21,2000,30")
+ sql("rebuild datamap dm1")
+ checkResult()
+ }
+
+ test("test alter add column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table add columns(d int)")
+ }.getMessage.contains("Cannot add columns in a DataMap table default.dm1_table")
+ }
+
+ test("test drop column on maintable") {
+ // check drop column not present in datamap table
+ sql("alter table maintable drop columns(c_code)")
+ checkResult()
+ // check drop column present in datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable drop columns(name)")
+ }.getMessage.contains("Column name cannot be dropped because it exists in mv datamap: dm1")
+ }
+
+ test("test alter drop column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table drop columns(maintable_name)")
+ }.getMessage.contains("Cannot drop columns present in a datamap table default.dm1_table")
+ }
+
+ test("test rename column on maintable") {
+ // check rename column not present in datamap table
+ sql("alter table maintable change c_code d_code int")
+ checkResult()
+ // check rename column present in mv datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable change name name1 string")
+ }.getMessage.contains("Column name exists in a MV datamap. Drop MV datamap to continue")
+ }
+
+ test("test alter rename column on datamaptable") {
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table change sum_price sum_cost int")
+ }.getMessage.contains("Cannot change data type or rename column for columns " +
+ "present in mv datamap table default.dm1_table")
+ }
+
+ test("test alter rename table") {
+ //check rename maintable
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table maintable rename to maintable_rename")
+ }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap")
+ //check rename datamaptable
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table dm1_table rename to dm11_table")
+ }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap")
+ }
+
+ test("test alter change datatype") {
+ //change datatype for column
+ intercept[ProcessMetaDataException] {
+ sql("alter table maintable change price price bigint")
+ }.getMessage.contains("Column price exists in a MV datamap. Drop MV datamap to continue")
+ //change datatype for column not present in datamap table
+ sql("alter table maintable change c_code c_code bigint")
+ checkResult()
+ //change datatype for column present in datamap table
+ intercept[ProcessMetaDataException] {
+ sql("alter table dm1_table change sum_price sum_price bigint")
+ }.getMessage.contains("Cannot change data type or rename column for columns " +
+ "present in mv datamap table default.dm1_table")
+ }
+
+ test("test dmproperties") {
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties" +
+ "('LOCAL_DICTIONARY_ENABLE'='false') as select name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties('TABLE_BLOCKSIZE'='256 MB') " +
+ "as select name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size 256 MB")
+ }
+
+ test("test table properties") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('LOCAL_DICTIONARY_ENABLE'='false')")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false")
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('TABLE_BLOCKSIZE'='256 MB')")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,price from maintable")
+ checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size 256 MB")
+ }
+
+ test("test delete segment by id on main table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("Delete from table maintable where segment.id in (0)")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("Delete from table maintable where segment.id in (1)")
+ }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap")
+ intercept[UnsupportedOperationException] {
+ sql("Delete from table dm1_table where segment.id in (0)")
+ }.getMessage.contains("Delete segment operation is not supported on mv table")
+ }
+
+ test("test delete segment by date on main table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("Delete from table maintable where segment.id in (0)")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+ "from maintable group by name")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("DELETE FROM TABLE maintable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+ }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap")
+ intercept[UnsupportedOperationException] {
+ sql("DELETE FROM TABLE dm1_table WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+ }.getMessage.contains("Delete segment operation is not supported on mv table")
+ }
+
+ test("test partition table with mv") {
+ sql("drop table if exists par_table")
+ sql("CREATE TABLE par_table(id INT, name STRING, age INT) PARTITIONED BY(city string) STORED BY 'carbondata'")
+ sql("insert into par_table values(1,'abc',3,'def')")
+ sql("drop datamap if exists p1")
+ sql("create datamap p1 using 'mv' WITH DEFERRED REBUILD as select city, id from par_table")
+ sql("rebuild datamap p1")
+ intercept[MalformedCarbonCommandException] {
+ sql("alter table par_table drop partition (city='def')")
+ }.getMessage.contains("Drop Partition is not supported for datamap table or for tables which have child datamap")
+ sql("drop datamap if exists p1")
+ }
+
+ test("test direct load to mv datamap table") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm1")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name " +
+ "from maintable")
+ sql("rebuild datamap dm1")
+ intercept[UnsupportedOperationException] {
+ sql("insert into dm1_table select 2")
+ }.getMessage.contains("Cannot insert/load data directly into pre-aggregate/child table")
+ sql("drop table IF EXISTS maintable")
+ }
+
+
+ test("test drop datamap with tablename") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm1 on table maintable")
+ sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price " +
+ "from maintable")
+ sql("rebuild datamap dm1")
+ checkAnswer(sql("select price from maintable"), Seq(Row(2000)))
+ checkExistence(sql("show datamap on table maintable"), true, "dm1")
+ sql("drop datamap dm1 on table maintable")
+ checkExistence(sql("show datamap on table maintable"), false, "dm1")
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test mv with attribute having qualifier") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (product string) partitioned by (amount int) stored by 'carbondata' ")
+ sql("insert into maintable values('Mobile',2000)")
+ sql("drop datamap if exists p")
+ sql("Create datamap p using 'mv' as Select p.product, p.amount from maintable p where p.product = 'Mobile'")
+ sql("rebuild datamap p")
+ checkAnswer(sql("Select p.product, p.amount from maintable p where p.product = 'Mobile'"), Seq(Row("Mobile", 2000)))
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test mv with non-carbon table") {
+ sql("drop table if exists noncarbon")
+ sql("create table noncarbon (product string,amount int)")
+ sql("insert into noncarbon values('Mobile',2000)")
+ sql("drop datamap if exists p")
+ intercept[MalformedCarbonCommandException] {
+ sql("Create datamap p using 'mv' as Select product from noncarbon")
+ }.getMessage.contains("Non-Carbon table does not support creating MV datamap")
+ sql("drop table if exists noncarbon")
+ }
+
+}
+
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
index 96f1816..07cd232 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
@@ -23,9 +23,9 @@ object TestSQLBatch {
val sampleTestCases = Seq(
("case_1",
s"""
- |SELECT i_item_id
+ |SELECT i_item_id, i_item_sk
|FROM Item
- |WHERE i_item_sk = 1
+ |WHERE i_item_sk = 2
""".stripMargin.trim,
s"""
|SELECT i_item_id, i_item_sk
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 49a62a2..7ba8300 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -211,7 +211,7 @@ class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with Be
.stripMargin)
assert(intercept[RuntimeException] {
sql(s"insert into maintable_preagg_sum values(1, 30)")
- }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table"))
+ }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
index a7e425c..7fa2672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
@@ -71,7 +71,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
// check value after inserting
checkAnswer(sql("SELECT * FROM maintable_agg1_minute"),
@@ -94,7 +94,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 3: don't support insert") {
@@ -118,7 +118,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 'hello', 60)")
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 4: don't support load") {
@@ -147,7 +147,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
""".stripMargin)
}
assert(e.getMessage.equalsIgnoreCase(
- "Cannot insert/load data directly into pre-aggregate table"))
+ "Cannot insert/load data directly into pre-aggregate/child table"))
}
test("test timeseries unsupported 5: don't support update") {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d9dec68..3c6b265 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache._
-import org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener, LoadPostDataMapListener}
+import org.apache.spark.sql.execution.command.mv._
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
@@ -195,6 +195,13 @@ object CarbonEnv {
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
+ .addListener(classOf[DeleteSegmentByIdPreEvent], DataMapDeleteSegmentPreListener)
+ .addListener(classOf[DeleteSegmentByDatePreEvent], DataMapDeleteSegmentPreListener)
+ .addListener(classOf[AlterTableDropColumnPreEvent], DataMapDropColumnPreListener)
+ .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
+ DataMapChangeDataTypeorRenameColumnPreListener)
+ .addListener(classOf[AlterTableAddColumnPreEvent], DataMapAddColumnsPreListener)
+
}
/**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 0bafe04..b4e60fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -122,6 +122,15 @@ case class CarbonDropDataMapCommand(
dropDataMapFromSystemFolder(sparkSession)
return Seq.empty
}
+ } else if (mainTable != null) {
+ // If table is defined and datamap is MV datamap, then drop the datamap
+ val dmSchema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
+ .filter(dataMapSchema => dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
+ if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap &&
+ null != dmSchema.head.getRelationIdentifier)) {
+ dropDataMapFromSystemFolder(sparkSession)
+ return Seq.empty
+ }
}
// drop preaggregate datamap.
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c142398..a2a9d3c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, DataCommand}
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -71,7 +72,7 @@ case class CarbonCleanFilesCommand(
isInternalCleanCall = true)
}.toList
cleanFileCommands.foreach(_.processMetadata(sparkSession))
- } else if (CarbonTable.hasMVDataMap(carbonTable)) {
+ } else if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
index f94b73a..5b1d7e5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -18,22 +18,32 @@
package org.apache.spark.sql.execution.command.mv
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.util.DataMapUtil
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.events.{
- AlterTableCompactionPreStatusUpdateEvent,
- DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener, UpdateTablePostEvent
-}
+import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
import org.apache.carbondata.processing.merger.CompactionType
+
+object DataMapListeners {
+ def getDataMapTableColumns(dataMapSchema: DataMapSchema,
+ carbonTable: CarbonTable): mutable.Buffer[String] = {
+ val listOfColumns: mutable.Buffer[String] = new mutable.ArrayBuffer[String]()
+ listOfColumns.asJava
+ .addAll(dataMapSchema.getMainTableColumnList.get(carbonTable.getTableName))
+ listOfColumns
+ }
+}
+
/**
* Listener to trigger compaction on mv datamap after main table compaction
*/
@@ -139,3 +149,129 @@ object LoadPostDataMapListener extends OperationEventListener {
}
}
}
+
+/**
+ * Listeners to block operations like delete segment on id or by date on tables
+ * having an mv datamap or on mv datamap tables
+ */
+object DataMapDeleteSegmentPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val carbonTable = event match {
+ case e: DeleteSegmentByIdPreEvent =>
+ e.asInstanceOf[DeleteSegmentByIdPreEvent].carbonTable
+ case e: DeleteSegmentByDatePreEvent =>
+ e.asInstanceOf[DeleteSegmentByDatePreEvent].carbonTable
+ }
+ if (null != carbonTable) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ throw new UnsupportedOperationException(
+ "Delete segment operation is not supported on tables having child datamap")
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ "Delete segment operation is not supported on datamap table")
+ }
+ }
+ }
+}
+
+object DataMapAddColumnsPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val dataTypeChangePreListener = event.asInstanceOf[AlterTableAddColumnPreEvent]
+ val carbonTable = dataTypeChangePreListener.carbonTable
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot add columns in a DataMap table " +
+ s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+ }
+ }
+}
+
+
+object DataMapDropColumnPreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val dropColumnChangePreListener = event.asInstanceOf[AlterTableDropColumnPreEvent]
+ val carbonTable = dropColumnChangePreListener.carbonTable
+ val alterTableDropColumnModel = dropColumnChangePreListener.alterTableDropColumnModel
+ val columnsToBeDropped = alterTableDropColumnModel.columns
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ val dataMapSchemaList = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ for (dataMapSchema <- dataMapSchemaList) {
+ if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+ val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+ val columnExistsInChild = listOfColumns.collectFirst {
+ case parentColumnName if columnsToBeDropped.contains(parentColumnName) =>
+ parentColumnName
+ }
+ if (columnExistsInChild.isDefined) {
+ throw new UnsupportedOperationException(
+ s"Column ${ columnExistsInChild.head } cannot be dropped because it exists " +
+ s"in " + dataMapSchema.getProviderName + " datamap:" +
+ s"${ dataMapSchema.getDataMapName }")
+ }
+ }
+ }
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot drop columns present in a datamap table ${ carbonTable.getDatabaseName }." +
+ s"${ carbonTable.getTableName }")
+ }
+ }
+}
+
+object DataMapChangeDataTypeorRenameColumnPreListener
+ extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val colRenameDataTypeChangePreListener = event
+ .asInstanceOf[AlterTableColRenameAndDataTypeChangePreEvent]
+ val carbonTable = colRenameDataTypeChangePreListener.carbonTable
+ val alterTableDataTypeChangeModel = colRenameDataTypeChangePreListener
+ .alterTableDataTypeChangeModel
+ val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
+ val dataMapSchemaList = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ for (dataMapSchema <- dataMapSchemaList) {
+ if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+ val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+ if (listOfColumns.contains(columnToBeAltered)) {
+ throw new UnsupportedOperationException(
+ s"Column $columnToBeAltered exists in a " + dataMapSchema.getProviderName +
+ " datamap. Drop " + dataMapSchema.getProviderName + " datamap to continue")
+ }
+ }
+ }
+ }
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ s"Cannot change data type or rename column for columns present in mv datamap table " +
+ s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+ }
+ }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index a9b581c..9119375 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -69,6 +70,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
setAuditTable(table)
setAuditInfo(Map("partition" -> specs.mkString(",")))
+ if (DataMapUtil.hasMVDataMap(table) || table.isChildTable) {
+ throw new MalformedCarbonCommandException(
+ "Drop Partition is not supported for datamap table or for tables which have child datamap")
+ }
if (table.isHivePartitionTable) {
var locks = List.empty[ICarbonLock]
try {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 4a0b492..c4c3539 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil, Segment}
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
@@ -717,9 +717,9 @@ object LoadPreAggregateTablePreListener extends OperationEventListener {
val carbonLoadModel = loadEvent.getCarbonLoadModel
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
- if (table.isChildDataMap && !isInternalLoadCall) {
+ if ((table.isChildDataMap || table.isChildTable) && !isInternalLoadCall) {
throw new UnsupportedOperationException(
- "Cannot insert/load data directly into pre-aggregate table")
+ "Cannot insert/load data directly into pre-aggregate/child table")
}
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index b729347..86b2d00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.PartitionUtils
+import org.apache.spark.util.{DataMapUtil, PartitionUtils}
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
@@ -90,104 +90,8 @@ case class PreAggregateTableHelper(
throw new MalformedDataMapCommandException(
"Parent table name is different in select and create")
}
- var neworder = Seq[String]()
- val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
- parentOrder.foreach(parentcol =>
- fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get(0).parentColumnName))
- .map(cols => neworder :+= cols.column))
- tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
- tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
- getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
- .LOAD_SORT_SCOPE_DEFAULT))
- tableProperties
- .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
- tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
- parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
- CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
-
- // Datamap table name and columns are automatically added prefix with parent table name
- // in carbon. For convenient, users can type column names same as the ones in select statement
- // when config dmproperties, and here we update column names with prefix.
- // If longStringColumn is not present in dm properties then we take long_string_columns from
- // the parent table.
- var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
- if (longStringColumn.isEmpty) {
- val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties
- .asScala
- .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
- val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String]
- fieldRelationMap foreach (fields => {
- val aggFunc = fields._2.aggregateFunction
- val relationList = fields._2.columnTableRelationList
- // check if columns present in datamap are long_string_col in parent table. If they are
- // long_string_columns in parent, make them long_string_columns in datamap
- if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
- .contains(relationList.head.head.parentColumnName)) {
- varcharDatamapFields += relationList.head.head.parentColumnName
- }
- })
- if (!varcharDatamapFields.isEmpty) {
- longStringColumn = Option(varcharDatamapFields.mkString(","))
- }
- }
-
- if (longStringColumn != None) {
- val fieldNames = fields.map(_.column)
- val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map{ colName =>
- val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
- if (!fieldNames.contains(newColName)) {
- throw new MalformedDataMapCommandException(
- CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName
- + " does not in datamap")
- }
- newColName
- }
- tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
- newLongStringColumn.mkString(","))
- }
-
- // inherit the local dictionary properties of main parent table
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
- parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
- parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
- CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
- val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
-
- val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
- .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
-
- val newDictInclude =
- parentDictInclude.flatMap(parentcol =>
- fields.collect {
- case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get.head.parentColumnName) =>
- col.column
- })
-
- val newDictExclude = parentDictExclude.flatMap(parentcol =>
- fields.collect {
- case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
- parentcol.equals(fieldRelationMap(col).
- columnTableRelationList.get.head.parentColumnName) =>
- col.column
- })
- if (newDictInclude.nonEmpty) {
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(","))
- }
- if (newDictExclude.nonEmpty) {
- tableProperties
- .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(","))
- }
+ DataMapUtil
+ .inheritTablePropertiesFromMainTable(parentTable, fields, fieldRelationMap, tableProperties)
val tableIdentifier =
TableIdentifier(parentTable.getTableName + "_" + dataMapName,
Some(parentTable.getDatabaseName))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f41cfc1..6e3e398 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -81,8 +81,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
throw new MalformedCarbonCommandException("alter rename is not supported for index datamap")
}
// if table have create mv datamap, not support table rename
- if (CarbonTable.hasMVDataMap(oldCarbonTable)) {
- throw new MalformedCarbonCommandException("alter rename is not supported for mv datamap")
+ if (DataMapUtil.hasMVDataMap(oldCarbonTable) || oldCarbonTable.isChildTable) {
+ throw new MalformedCarbonCommandException(
+ "alter rename is not supported for datamap table or for tables which have child datamap")
}
var timeStamp = 0L
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7d449b5..91d7675 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable
import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -322,7 +322,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException(
"Streaming property value is incorrect")
}
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
throw new MalformedCarbonCommandException(
"The table which has MV datamap does not support set streaming property")
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index d875fdf..96b6000 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
@@ -30,12 +29,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
@@ -71,7 +69,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
"Update operation is not supported for pre-aggregate table")
}
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -214,7 +212,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
"Delete operation is not supported for pre-aggregate table")
}
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (CarbonTable.hasMVDataMap(carbonTable)) {
+ if (DataMapUtil.hasMVDataMap(carbonTable)) {
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
new file mode 100644
index 0000000..24e28ad
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.execution.command.{DataMapField, Field}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+
+/**
+ * Utility class for keeping all the utility methods common for pre-aggregate and mv datamap
+ */
+object DataMapUtil {
+
+ def inheritTablePropertiesFromMainTable(parentTable: CarbonTable,
+ fields: Seq[Field],
+ fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField],
+ tableProperties: mutable.Map[String, String]): Unit = {
+ var neworder = Seq[String]()
+ val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
+ parentOrder.foreach(parentcol =>
+ fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size == 1 &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get(0).parentColumnName))
+ .map(cols => neworder :+= cols.column))
+ tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
+ tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+ getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT))
+ tableProperties
+ .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+ tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+ CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+
+ // Datamap table name and columns are automatically added prefix with parent table name
+ // in carbon. For convenient, users can type column names same as the ones in select statement
+ // when config dmproperties, and here we update column names with prefix.
+ // If longStringColumn is not present in dm properties then we take long_string_columns from
+ // the parent table.
+ var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+ if (longStringColumn.isEmpty) {
+ val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties
+ .asScala
+ .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
+ val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String]
+ fieldRelationMap foreach (fields => {
+ val aggFunc = fields._2.aggregateFunction
+ val relationList = fields._2.columnTableRelationList
+ // check if columns present in datamap are long_string_col in parent table. If they are
+ // long_string_columns in parent, make them long_string_columns in datamap
+ if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
+ .contains(relationList.head.head.parentColumnName)) {
+ varcharDatamapFields += relationList.head.head.parentColumnName
+ }
+ })
+ if (!varcharDatamapFields.isEmpty) {
+ longStringColumn = Option(varcharDatamapFields.mkString(","))
+ }
+ }
+
+ if (longStringColumn != None) {
+ val fieldNames = fields.map(_.column)
+ val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map { colName =>
+ val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
+ if (!fieldNames.contains(newColName)) {
+ throw new MalformedDataMapCommandException(
+ CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName
+ + " does not in datamap")
+ }
+ newColName
+ }
+ tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
+ newLongStringColumn.mkString(","))
+ }
+
+ // inherit the local dictionary properties of main parent table
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
+ val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
+
+ val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
+
+ val newDictInclude =
+ parentDictInclude.flatMap(parentcol =>
+ fields.collect {
+ case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size == 1 &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get.head.parentColumnName) =>
+ col.column
+ })
+
+ val newDictExclude = parentDictExclude.flatMap(parentcol =>
+ fields.collect {
+ case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+ fieldRelationMap(col).columnTableRelationList.size == 2 &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get.head.parentColumnName) =>
+ col.column
+ })
+ if (newDictInclude.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(","))
+ }
+ if (newDictExclude.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(","))
+ }
+ }
+
+ /**
+ * Return true if MV datamap present in the specified table
+ */
+ @throws[IOException]
+ def hasMVDataMap(carbonTable: CarbonTable): Boolean = {
+ val dataMapSchemaList = DataMapStoreManager.getInstance.
+ getDataMapSchemasOfTable(carbonTable).asScala
+ dataMapSchemaList.foreach { dataMapSchema =>
+ if (dataMapSchema.getProviderName.equalsIgnoreCase(MV.toString)) {
+ return true
+ }
+ }
+ false
+ }
+}