You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/21 12:20:41 UTC
[carbondata] branch master updated: [CARBONDATA-3338] Support
Incremental DataLoad for MV Datamap[with single parent table]
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 595b1a4 [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
595b1a4 is described below
commit 595b1a46de30a3daf110f97406f1a2a2ecd82e0a
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Apr 19 12:52:48 2019 +0530
[CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
Support Incremental DataLoad for MV Datamap[with single parent table] for both lazy and non-lazy mv datamap
New DDL for supporting compaction on mv datamap table.
Please refer https://issues.apache.org/jira/browse/CARBONDATA-3296 for more details
This closes #3179
---
.../core/constants/CarbonCommonConstants.java | 11 +
.../carbondata/core/datamap/DataMapProvider.java | 227 +++++++-
.../carbondata/core/datamap/DataMapUtil.java | 20 +
.../core/datamap/dev/DataMapSyncStatus.java | 84 +++
.../datamap/status/DataMapSegmentStatusUtil.java | 78 +++
.../core/datamap/status/DataMapStatusManager.java | 71 ++-
.../status/DiskBasedDataMapStatusProvider.java | 7 +
.../core/metadata/schema/table/CarbonTable.java | 6 +
.../core/statusmanager/LoadMetadataDetails.java | 13 +
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 30 +-
.../carbondata/mv/datamap/MVDataMapProvider.scala | 82 ++-
.../apache/carbondata/mv/datamap/MVHelper.scala | 26 +-
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 67 +--
.../mv/rewrite/MVIncrementalLoadingTestcase.scala | 583 +++++++++++++++++++++
.../carbondata/mv/rewrite/MVSampleTestCase.scala | 8 -
.../carbondata/mv/rewrite/MVTPCDSTestCase.scala | 9 -
.../carbondata/mv/rewrite/MVTpchTestCase.scala | 14 -
.../src/test/resources/products.csv | 5 +
.../src/test/resources/sales_data.csv | 5 +
.../carbondata/datamap/IndexDataMapProvider.java | 9 +-
.../datamap/PreAggregateDataMapProvider.java | 9 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +
.../scala/org/apache/spark/sql/CarbonEnv.scala | 8 +-
.../datamap/CarbonCreateDataMapCommand.scala | 11 +-
.../datamap/CarbonDataMapRebuildCommand.scala | 5 +-
.../CarbonAlterTableCompactionCommand.scala | 3 +
.../management/CarbonCleanFilesCommand.scala | 14 +
.../mutation/CarbonProjectForDeleteCommand.scala | 12 +
.../mutation/CarbonProjectForUpdateCommand.scala | 12 +-
.../execution/command/mv/DataMapListeners.scala | 141 +++++
.../spark/sql/hive/CarbonAnalysisRules.scala | 28 +-
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 14 +-
.../processing/merger/CarbonDataMergerUtil.java | 24 +-
.../processing/util/CarbonLoaderUtil.java | 12 +
34 files changed, 1520 insertions(+), 137 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8ed9c1d..9375414 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2163,4 +2163,15 @@ public final class CarbonCommonConstants {
public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
500;
+
+ /**
+ * This property will be used to store datamap name
+ */
+ public static final String DATAMAP_NAME = "datamap_name";
+
+ /**
+ * This property will be used to store parentable name's associated with datamap
+ */
+ public static final String PARENT_TABLES = "parent_tables";
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index cc05d31..fe2e7dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -18,13 +18,27 @@
package org.apache.carbondata.core.datamap;
import java.io.IOException;
+import java.util.*;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.log4j.Logger;
/**
* DataMap is a accelerator for certain type of query. Developer can add new DataMap
@@ -58,6 +72,8 @@ public abstract class DataMapProvider {
private CarbonTable mainTable;
private DataMapSchema dataMapSchema;
+ private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
this.mainTable = mainTable;
this.dataMapSchema = dataMapSchema;
@@ -106,13 +122,214 @@ public abstract class DataMapProvider {
* 1. after datamap creation and if `autoRefreshDataMap` is set to true
* 2. user manually trigger REBUILD DATAMAP command
*/
- public abstract void rebuild() throws IOException, NoSuchDataMapException;
+ public boolean rebuild() throws IOException, NoSuchDataMapException {
+ if (null == dataMapSchema.getRelationIdentifier()) {
+ return false;
+ }
+ String newLoadName = "";
+ String segmentMap = "";
+ AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier
+ .from(dataMapSchema.getRelationIdentifier().getTablePath(),
+ dataMapSchema.getRelationIdentifier().getDatabaseName(),
+ dataMapSchema.getRelationIdentifier().getTableName());
+ SegmentStatusManager segmentStatusManager =
+ new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
+ Map<String, List<String>> segmentMapping = new HashMap<>();
+ // Acquire table status lock to handle concurrent dataloading
+ ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info(
+ "Acquired lock for table" + dataMapSchema.getRelationIdentifier().getDatabaseName()
+ + "." + dataMapSchema.getRelationIdentifier().getTableName()
+ + " for table status updation");
+ String dataMapTableMetadataPath =
+ CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
+ LoadMetadataDetails[] loadMetaDataDetails =
+ SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
+ List<LoadMetadataDetails> listOfLoadFolderDetails =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
+ if (dataMapSchema.isLazy()) {
+ // check if rebuild to datamap is already in progress and throw exception
+ if (!listOfLoadFolderDetails.isEmpty()) {
+ for (LoadMetadataDetails loadMetaDetail : loadMetaDataDetails) {
+ if ((loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
+ || loadMetaDetail.getSegmentStatus()
+ == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && SegmentStatusManager
+ .isLoadInProgress(dataMapTableAbsoluteTableIdentifier,
+ loadMetaDetail.getLoadName())) {
+ throw new RuntimeException("Rebuild to datamap " + dataMapSchema.getDataMapName()
+ + " is already in progress");
+ }
+ }
+ }
+ }
+ boolean isFullRefresh = false;
+ if (null != dataMapSchema.getProperties().get("full_refresh")) {
+ isFullRefresh = Boolean.valueOf(dataMapSchema.getProperties().get("full_refresh"));
+ }
+ if (!isFullRefresh) {
+ if (!getSpecificSegmentsTobeLoaded(segmentMapping, listOfLoadFolderDetails)) {
+ return false;
+ }
+ segmentMap = new Gson().toJson(segmentMapping);
+ } else {
+ List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+ for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+ List<String> mainTableSegmentList =
+ DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+ if (mainTableSegmentList.isEmpty()) {
+ return false;
+ }
+ segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName(), mainTableSegmentList);
+ }
+ segmentMap = new Gson().toJson(segmentMapping);
+ }
+
+ // To handle concurrent dataloading to datamap, create new loadMetaEntry and
+ // set segmentMap to new loadMetaEntry and pass new segmentId with load command
+ LoadMetadataDetails loadMetadataDetail = new LoadMetadataDetails();
+ String segmentId =
+ String.valueOf(SegmentStatusManager.createNewSegmentId(loadMetaDataDetails));
+ loadMetadataDetail.setLoadName(segmentId);
+ loadMetadataDetail.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS);
+ loadMetadataDetail.setExtraInfo(segmentMap);
+ listOfLoadFolderDetails.add(loadMetadataDetail);
+ newLoadName = segmentId;
+
+ SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath
+ .getTableStatusFilePath(dataMapSchema.getRelationIdentifier().getTablePath()),
+ listOfLoadFolderDetails
+ .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+ } else {
+ LOGGER.error(
+ "Not able to acquire the lock for Table status updation for table " + dataMapSchema
+ .getRelationIdentifier().getDatabaseName() + "." + dataMapSchema
+ .getRelationIdentifier().getTableName());
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" + dataMapSchema
+ .getRelationIdentifier().getDatabaseName() + "." + dataMapSchema.getRelationIdentifier()
+ .getTableName());
+ } else {
+ LOGGER.error("Unable to unlock Table lock for table" + dataMapSchema.getRelationIdentifier()
+ .getDatabaseName() + "." + dataMapSchema.getRelationIdentifier().getTableName()
+ + " during table status updation");
+ }
+ }
+ return rebuildInternal(newLoadName, segmentMapping);
+ }
/**
- * Build the datamap incrementally by loading specified segment data
+ * This method will compare mainTable and dataMapTable segment List and loads only newly added
+ * segment from main table to dataMap table.
+ * In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap
+ * will be loaded
+ * Eg:
+ * case 1: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap:
+ * { 0 -> 0, 1-> 1,2}. If (1, 2) segments of main table are compacted to 1.1 and new segment (3)
+ * is loaded to main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
+ * In this case, segment (1) of dataMap table will be marked for delete, and new segment
+ * {2 -> 1.1, 3} will be loaded to dataMap table
+ * case 2: Consider mainTableSegmentList: {0, 1, 2, 3}, dataMapToMainTable segmentMap:
+ * { 0 -> 0,1,2, 1-> 3}. If (1, 2) segments of main table are compacted to 1.1 and new segment
+ * (4) is loaded to main table, then mainTableSegmentList will be updated to {0, 1.1, 3, 4}.
+ * In this case, segment (0) of dataMap table will be marked for delete and segment (0) of
+ * main table will be added to validSegmentList which needs to be loaded again. Now, new dataMap
+ * table segment (2) with main table segmentList{2 -> 1.1, 4, 0} will be loaded to dataMap table.
+ * dataMapToMainTable segmentMap will be updated to {1 -> 3, 2 -> 1.1, 4, 0} after rebuild
*/
- public void incrementalBuild(String[] segmentIds) {
- throw new UnsupportedOperationException();
+ private boolean getSpecificSegmentsTobeLoaded(Map<String, List<String>> segmentMapping,
+ List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+ // invalidDataMapSegmentList holds segment list which needs to be marked for delete
+ HashSet<String> invalidDataMapSegmentList = new HashSet<>();
+ if (listOfLoadFolderDetails.isEmpty()) {
+ // If segment Map is empty, load all valid segments from main tables to dataMap
+ for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+ List<String> mainTableSegmentList =
+ DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+ // If mainTableSegmentList is empty, no need to trigger load command
+ // TODO: handle in case of multiple tables load to datamap table
+ if (mainTableSegmentList.isEmpty()) {
+ return false;
+ }
+ segmentMapping.put(
+ relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT + relationIdentifier
+ .getTableName(), mainTableSegmentList);
+ }
+ } else {
+ for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+ List<String> dataMapTableSegmentList = new ArrayList<>();
+ for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
+ if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
+ || loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+ Map<String, List<String>> segmentMaps =
+ DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
+ dataMapTableSegmentList.addAll(segmentMaps.get(
+ relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName()));
+ }
+ }
+ List<String> dataMapSegmentList = new ArrayList<>(dataMapTableSegmentList);
+ // Get all segments for parent relationIdentifier
+ List<String> mainTableSegmentList =
+ DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+ dataMapTableSegmentList.removeAll(mainTableSegmentList);
+ mainTableSegmentList.removeAll(dataMapSegmentList);
+ if (mainTableSegmentList.isEmpty()) {
+ return false;
+ }
+ if (!dataMapTableSegmentList.isEmpty()) {
+ List<String> invalidMainTableSegmentList = new ArrayList<>();
+ // validMainTableSegmentList holds segment list which needs to be loaded again
+ HashSet<String> validMainTableSegmentList = new HashSet<>();
+
+ // For dataMap segments which are not in main table segment list(if main table
+ // is compacted), iterate over those segments and get dataMap segments which needs to
+ // be marked for delete and main table segments which needs to be loaded again
+ for (String segmentId : dataMapTableSegmentList) {
+ for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
+ if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
+ || loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+ Map<String, List<String>> segmentMaps =
+ DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
+ List<String> segmentIds = segmentMaps.get(
+ relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName());
+ if (segmentIds.contains(segmentId)) {
+ segmentIds.remove(segmentId);
+ validMainTableSegmentList.addAll(segmentIds);
+ invalidMainTableSegmentList.add(segmentId);
+ invalidDataMapSegmentList.add(loadMetaDetail.getLoadName());
+ }
+ }
+ }
+ }
+ // remove invalid segment from validMainTableSegmentList if present
+ validMainTableSegmentList.removeAll(invalidMainTableSegmentList);
+ // Add all valid segments of main table which needs to be loaded again
+ mainTableSegmentList.addAll(validMainTableSegmentList);
+ segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName(), mainTableSegmentList);
+ } else {
+ segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName(), mainTableSegmentList);
+ }
+ }
+ }
+ // Remove invalid datamap segments
+ if (!invalidDataMapSegmentList.isEmpty()) {
+ for (LoadMetadataDetails loadMetadataDetail : listOfLoadFolderDetails) {
+ if (invalidDataMapSegmentList.contains(loadMetadataDetail.getLoadName())) {
+ loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+ }
+ }
+ }
+ return true;
}
/**
@@ -126,4 +343,6 @@ public abstract class DataMapProvider {
public abstract DataMapFactory getDataMapFactory();
public abstract boolean supportRebuild();
+
+ public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap);
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 95c69c1..0a604fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -30,7 +30,9 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
@@ -250,4 +252,22 @@ public class DataMapUtil {
return ssm.getValidAndInvalidSegments();
}
+ /**
+ * Returns valid segment list for a given RelationIdentifier
+ *
+ * @param relationIdentifier get list of segments for relation identifier
+ * @return list of valid segment id's
+ * @throws IOException
+ */
+ public static List<String> getMainTableValidSegmentList(RelationIdentifier relationIdentifier)
+ throws IOException {
+ List<String> segmentList = new ArrayList<>();
+ List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
+ .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
+ relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
+ for (Segment segment : validSegments) {
+ segmentList.add(segment.getSegmentNo());
+ }
+ return segmentList;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
new file mode 100644
index 0000000..eb7bf47
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapUtil;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Interface to check whether datamap can be enabled
+ */
+@InterfaceAudience.Developer("DataMap")
+public abstract class DataMapSyncStatus {
+
+ /**
+ * This method checks if main table and datamap table are synchronised or not. If synchronised
+ * return true to enable the datamap
+ *
+ * @param dataMapSchema of datamap to be disabled or enabled
+ * @return flag to enable or disable datamap
+ * @throws IOException
+ */
+ public static boolean canDataMapBeEnabled(DataMapSchema dataMapSchema) throws IOException {
+ boolean isDataMapInSync = true;
+ String metaDataPath =
+ CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
+ LoadMetadataDetails[] dataMapLoadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(metaDataPath);
+ Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
+ for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
+ Map<String, List<String>> segmentMap =
+ DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
+ if (dataMapSegmentMap.isEmpty()) {
+ dataMapSegmentMap.putAll(segmentMap);
+ } else {
+ for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
+ if (null != dataMapSegmentMap.get(entry.getKey())) {
+ dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+ }
+ }
+ }
+ }
+ List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
+ for (RelationIdentifier parentTable : parentTables) {
+ List<String> mainTableValidSegmentList =
+ DataMapUtil.getMainTableValidSegmentList(parentTable);
+ if (!mainTableValidSegmentList.isEmpty() && !dataMapSegmentMap.isEmpty()) {
+ isDataMapInSync = dataMapSegmentMap.get(
+ parentTable.getDatabaseName() + CarbonCommonConstants.POINT + parentTable
+ .getTableName()).containsAll(mainTableValidSegmentList);
+ } else if (dataMapSegmentMap.isEmpty() && !mainTableValidSegmentList.isEmpty()) {
+ isDataMapInSync = false;
+ }
+ }
+ return isDataMapInSync;
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java
new file mode 100644
index 0000000..9e00104
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.status;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+
+import com.google.gson.Gson;
+
+/**
+ * Utility class to get updated segment mapping for datamap table
+ */
+public class DataMapSegmentStatusUtil {
+
+ /**
+ * This method will convert string to map and return segmentMap
+ */
+ public static Map<String, List<String>> getSegmentMap(String segmentMap) {
+ Gson gson = new Gson();
+ return gson.fromJson(segmentMap, Map.class);
+ }
+
+ /**
+ * In case of compaction on dataMap table,this method will merge the segment list of main table
+ * and return updated segment mapping
+ *
+ * @param mergedLoadName to find which all segments are merged to new compacted segment
+ * @param dataMapSchema of datamap table
+ * @param loadMetadataDetails of datamap table
+ * @return updated segment map after merging segment list
+ */
+ public static String getUpdatedSegmentMap(String mergedLoadName, DataMapSchema dataMapSchema,
+ LoadMetadataDetails[] loadMetadataDetails) {
+ Map<String, List<String>> segmentMapping = new HashMap<>();
+ List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+ for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+ for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+ if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
+ if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) {
+ if (segmentMapping.isEmpty()) {
+ segmentMapping.putAll(getSegmentMap(loadMetadataDetail.getExtraInfo()));
+ } else {
+ segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName()).addAll(
+ getSegmentMap(loadMetadataDetail.getExtraInfo()).get(
+ relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+ + relationIdentifier.getTableName()));
+ }
+ }
+ }
+ }
+ }
+ Gson gson = new Gson();
+ return gson.toJson(segmentMapping);
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index c2f97ea..96b053f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -23,9 +23,19 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
/**
* Maintains the status of each datamap. As per the status query will decide whether to hit datamap
@@ -38,6 +48,9 @@ public class DataMapStatusManager {
}
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(DataMapStatusManager.class.getName());
+
/**
* TODO Use factory when we have more storage providers
*/
@@ -95,9 +108,7 @@ public class DataMapStatusManager {
DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
List<DataMapSchema> dataMapToBeDisabled = new ArrayList<>(allDataMapSchemas.size());
for (DataMapSchema dataMap : allDataMapSchemas) {
- // TODO all non datamaps like MV is now supports only lazy. Once the support is made the
- // following check can be removed.
- if (dataMap.isLazy() || !dataMap.isIndexDataMap()) {
+ if (dataMap.isLazy()) {
dataMapToBeDisabled.add(dataMap);
}
}
@@ -122,9 +133,61 @@ public class DataMapStatusManager {
}
}
- private static DataMapSchema getDataMapSchema(String dataMapName)
+ public static DataMapSchema getDataMapSchema(String dataMapName)
throws IOException, NoSuchDataMapException {
return DataMapStoreManager.getInstance().getDataMapSchema(dataMapName);
}
+ /**
+ * This method will remove all segments of dataMap table in case of Insert-Overwrite/Update/Delete
+ * operations on main table
+ *
+ * @param allDataMapSchemas of main carbon table
+ * @throws IOException
+ */
+ public static void truncateDataMap(List<DataMapSchema> allDataMapSchemas)
+ throws IOException, NoSuchDataMapException {
+ for (DataMapSchema datamapschema : allDataMapSchemas) {
+ if (!datamapschema.isLazy()) {
+ disableDataMap(datamapschema.getDataMapName());
+ }
+ RelationIdentifier dataMapRelationIdentifier = datamapschema.getRelationIdentifier();
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier
+ .from(dataMapRelationIdentifier.getTablePath(),
+ dataMapRelationIdentifier.getDatabaseName(),
+ dataMapRelationIdentifier.getTableName()));
+ ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Acquired lock for table" + dataMapRelationIdentifier.getDatabaseName() + "."
+ + dataMapRelationIdentifier.getTableName() + " for table status updation");
+ String metaDataPath =
+ CarbonTablePath.getMetadataPath(dataMapRelationIdentifier.getTablePath());
+ LoadMetadataDetails[] loadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(metaDataPath);
+ for (LoadMetadataDetails entry : loadMetadataDetails) {
+ entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(dataMapRelationIdentifier.getTablePath()),
+ loadMetadataDetails);
+ } else {
+ LOGGER.error("Not able to acquire the lock for Table status updation for table "
+ + dataMapRelationIdentifier.getDatabaseName() + "." + dataMapRelationIdentifier
+ .getTableName());
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info(
+ "Table unlocked successfully after table status updation" + dataMapRelationIdentifier
+ .getDatabaseName() + "." + dataMapRelationIdentifier.getTableName());
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + dataMapRelationIdentifier.getDatabaseName()
+ + "." + dataMapRelationIdentifier.getTableName()
+ + " during table status updation");
+ }
+ }
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 27cb1cc..723010e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapSyncStatus;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -114,6 +115,12 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi
locked = carbonTableStatusLock.lockWithRetries();
if (locked) {
LOG.info("Datamap status lock has been successfully acquired.");
+ if (dataMapStatus == DataMapStatus.ENABLED && !dataMapSchemas.get(0).isIndexDataMap()) {
+ // Enable datamap only if datamap tables and main table are in sync
+ if (!DataMapSyncStatus.canDataMapBeEnabled(dataMapSchemas.get(0))) {
+ return;
+ }
+ }
DataMapStatusDetail[] dataMapStatusDetails = getDataMapStatusDetails();
List<DataMapStatusDetail> dataMapStatusList = Arrays.asList(dataMapStatusDetails);
dataMapStatusList = new ArrayList<>(dataMapStatusList);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 0f1f628..afb5fd3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1018,6 +1018,12 @@ public class CarbonTable implements Serializable, Writable {
!tableInfo.getParentRelationIdentifiers().isEmpty();
}
+ public boolean isChildTable() {
+ return null != tableInfo.getFactTable().getTableProperties()
+ .get(CarbonCommonConstants.PARENT_TABLES) && !tableInfo.getFactTable().getTableProperties()
+ .get(CarbonCommonConstants.PARENT_TABLES).isEmpty();
+ }
+
/**
* Return true if this is an external table (table with property "_external"="true", this is
* an internal table property set during table creation)
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 99dddba..54a6d0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -133,6 +133,11 @@ public class LoadMetadataDetails implements Serializable {
return partitionCount;
}
+ /**
+ * extraInfo will contain segment mapping Information for datamap table
+ */
+ private String extraInfo;
+
@Deprecated
public void setPartitionCount(String partitionCount) {
this.partitionCount = partitionCount;
@@ -434,4 +439,12 @@ public class LoadMetadataDetails implements Serializable {
return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\''
+ ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}';
}
+
+ public String getExtraInfo() {
+ return extraInfo;
+ }
+
+ public void setExtraInfo(String extraInfo) {
+ this.extraInfo = extraInfo;
+ }
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index bb56e58..315c66b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -27,9 +27,11 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
@@ -88,7 +90,8 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
if (!plan.isInstanceOf[Command] && !plan.isInstanceOf[DeserializeToObject]) {
val catalogs = extractCatalogs(plan)
!isDataMapReplaced(catalog.listAllValidSchema(), catalogs) &&
- isDataMapExists(catalog.listAllValidSchema(), catalogs)
+ isDataMapExists(catalog.listAllValidSchema(), catalogs) &&
+ !isSegmentSetForMainTable(catalogs)
} else {
false
}
@@ -136,4 +139,29 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
catalogs
}
+
+ /**
+ * Check if any segments are set for main table for Query. If any segments are set, then
+ * skip mv datamap table for query
+ */
+ def isSegmentSetForMainTable(catalogs: Seq[Option[CatalogTable]]): Boolean = {
+ catalogs.foreach { c =>
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ val segmentsToQuery = carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ c.get.identifier.database.get + "." +
+ c.get.identifier.table, "")
+ if (segmentsToQuery.isEmpty || segmentsToQuery.equalsIgnoreCase("*")) {
+ return false
+ } else {
+ return true
+ }
+ } else {
+ return false
+ }
+ }
+ false
+ }
+
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 5ffc46a..26c4cb6 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -18,7 +18,9 @@ package org.apache.carbondata.mv.datamap
import java.io.IOException
-import org.apache.spark.sql.SparkSession
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
@@ -29,8 +31,11 @@ import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
@@ -43,6 +48,8 @@ class MVDataMapProvider(
extends DataMapProvider(mainTable, dataMapSchema) {
protected var dropTableCommand: CarbonDropTableCommand = null
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
@throws[MalformedDataMapCommandException]
@throws[IOException]
override def initMeta(ctasSqlStatement: String): Unit = {
@@ -55,6 +62,11 @@ class MVDataMapProvider(
}
override def initData(): Unit = {
+ if (!dataMapSchema.isLazy) {
+ if (rebuild()) {
+ DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
+ }
+ }
}
@throws[IOException]
@@ -75,7 +87,8 @@ class MVDataMapProvider(
}
@throws[IOException]
- override def rebuild(): Unit = {
+ override def rebuildInternal(newLoadName: String,
+ segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = {
val ctasQuery = dataMapSchema.getCtasQuery
if (ctasQuery != null) {
val identifier = dataMapSchema.getRelationIdentifier
@@ -91,29 +104,78 @@ class MVDataMapProvider(
val queryPlan = SparkSQLUtil.execute(
sparkSession.sql(updatedQuery).queryExecution.analyzed,
sparkSession).drop("preAgg")
- val header = logicalPlan.output.map(_.name).mkString(",")
+ var isOverwriteTable = false
+ val isFullRefresh =
+ if (null != dataMapSchema.getProperties.get("full_refresh")) {
+ dataMapSchema.getProperties.get("full_refresh").toBoolean
+ } else {
+ false
+ }
+ if (isFullRefresh) {
+ isOverwriteTable = true
+ }
+ val dataMapTable = CarbonTable
+ .buildFromTablePath(identifier.getTableName,
+ identifier.getDatabaseName,
+ identifier.getTablePath,
+ identifier.getTableId)
+ // Set specified segments for incremental load
+ val segmentMapIterator = segmentMap.entrySet().iterator()
+ while (segmentMapIterator.hasNext) {
+ val entry = segmentMapIterator.next()
+ setSegmentsToLoadDataMap(entry.getKey, entry.getValue)
+ }
+ val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+ .filter { column =>
+ !column.getColumnName
+ .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+ }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
val loadCommand = CarbonLoadDataCommand(
databaseNameOp = Some(identifier.getDatabaseName),
tableName = identifier.getTableName,
factPathFromUser = null,
dimFilesPath = Seq(),
options = scala.collection.immutable.Map("fileheader" -> header),
- isOverwriteTable = true,
+ isOverwriteTable,
inputSqlString = null,
dataFrame = Some(queryPlan),
updateModel = None,
tableInfoOp = None,
- internalOptions = Map.empty,
+ internalOptions = Map("mergedSegmentName" -> newLoadName),
partition = Map.empty)
- SparkSQLUtil.execute(loadCommand, sparkSession)
+ try {
+ SparkSQLUtil.execute(loadCommand, sparkSession)
+ } catch {
+ case ex: Exception =>
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ LOGGER.error("Data Load failed for DataMap: ", ex)
+ return false
+ } finally {
+ unsetMainTableSegments()
+ }
}
+ true
}
- @throws[IOException]
- override def incrementalBuild(
- segmentIds: Array[String]): Unit = {
- throw new UnsupportedOperationException
+ /**
+ * This method will set main table segments which needs to be loaded to mv dataMap
+ */
+ private def setSegmentsToLoadDataMap(tableUniqueName: String,
+ mainTableSegmentList: java.util.List[String]): Unit = {
+ CarbonSession
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ tableUniqueName, mainTableSegmentList.asScala.mkString(","))
+ }
+
+ private def unsetMainTableSegments(): Unit = {
+ val relationIdentifiers = dataMapSchema.getParentTables.asScala
+ for (relationIdentifier <- relationIdentifiers) {
+ CarbonSession
+ .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ relationIdentifier.getDatabaseName + "." +
+ relationIdentifier.getTableName)
+ }
}
override def createDataMapCatalog : DataMapCatalog[SummaryDataset] =
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 9bed098..6d0b2d3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -27,14 +27,17 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
import org.apache.carbondata.datamap.DataMapManager
@@ -81,6 +84,7 @@ object MVHelper {
dmProperties.foreach(t => tableProperties.put(t._1, t._2))
val selectTables = getTables(logicalPlan)
+ val parentTables = new util.ArrayList[String]()
selectTables.foreach { selectTable =>
val mainCarbonTable = try {
Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
@@ -89,12 +93,14 @@ object MVHelper {
// Exception handling if it's not a CarbonTable
case ex : Exception => None
}
-
+ parentTables.add(mainCarbonTable.get.getTableName)
if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
throw new MalformedCarbonCommandException(
s"Streaming table does not support creating MV datamap")
}
}
+ tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
+ tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
// TODO inherit the table properties like sort order, sort scope and block size from parent
// tables to mv datamap table
@@ -127,14 +133,20 @@ object MVHelper {
dataMapSchema
.setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
tableIdentifier.table,
- ""))
+ CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession).getTableId))
val parentIdents = selectTables.map { table =>
- new RelationIdentifier(table.database, table.identifier.table, "")
+ val relationIdentifier = new RelationIdentifier(table.database, table.identifier.table, "")
+ relationIdentifier.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
+ relationIdentifier
}
+ dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+ if (dataMapSchema.isLazy) {
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ }
}
private def validateMVQuery(sparkSession: SparkSession,
@@ -241,6 +253,12 @@ object MVHelper {
}.isDefined || isFullReload
exp
}
+ // TODO:- Remove this case when incremental datalaoding is supported for multiple tables
+ logicalPlan.transformDown {
+ case join@Join(l1, l2, jointype, condition) =>
+ isFullReload = true
+ join
+ }
isFullReload
}
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 16c0567..5016bbe 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -105,7 +105,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection") {
sql("drop datamap if exists datamap1")
sql("create datamap datamap1 using 'mv' as select empname, designation from fact_table1")
- sql(s"rebuild datamap datamap1")
val df = sql("select empname,designation from fact_table1")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap1"))
@@ -116,7 +115,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub projection") {
sql("drop datamap if exists datamap2")
sql("create datamap datamap2 using 'mv' as select empname, designation from fact_table1")
- sql(s"rebuild datamap datamap2")
val df = sql("select empname from fact_table1")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap2"))
@@ -127,7 +125,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with projection filter") {
sql("drop datamap if exists datamap3")
sql("create datamap datamap3 using 'mv' as select empname, designation from fact_table1")
- sql(s"rebuild datamap datamap3")
val frame = sql("select empname, designation from fact_table1 where empname='shivani'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap3"))
@@ -138,7 +135,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub projection with non projection filter") {
sql("create datamap datamap4 using 'mv' as select empname, designation from fact_table1")
- sql(s"rebuild datamap datamap4")
val frame = sql("select designation from fact_table1 where empname='shivani'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap4"))
@@ -148,7 +144,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub projection with datamap filter") {
sql("create datamap datamap5 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
- sql(s"rebuild datamap datamap5")
val frame = sql("select designation from fact_table1 where empname='shivani'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -158,7 +153,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter ") {
sql("create datamap datamap6 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
- sql(s"rebuild datamap datamap6")
val frame = sql("select empname,designation from fact_table1 where empname='shivani'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap6"))
@@ -168,7 +162,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter and extra query column filter") {
sql("create datamap datamap7 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
- sql(s"rebuild datamap datamap7")
val frame = sql(
"select empname,designation from fact_table1 where empname='shivani' and designation='SA'")
val analyzed = frame.queryExecution.analyzed
@@ -179,7 +172,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter and different column filter") {
sql("create datamap datamap8 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
- sql(s"rebuild datamap datamap8")
val frame = sql("select empname,designation from fact_table1 where designation='SA'")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap8"))
@@ -189,7 +181,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") {
sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
- sql(s"rebuild datamap datamap9")
val frame = sql("select empname,designation from fact_table1 where deptname='cloud'")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -199,7 +190,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") {
sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
- sql(s"rebuild datamap datamap10")
val frame = sql("select empname,designation from fact_table1")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap10"))
@@ -209,7 +199,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") {
sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
- sql(s"rebuild datamap datamap11")
val frame = sql("select empname,designation from fact_table1 where designation='SA'")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap11"))
@@ -220,7 +209,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same group by query") {
sql("drop datamap if exists datamap12")
sql("create datamap datamap12 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap12")
val frame = sql("select empname, sum(utilization) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap12"))
@@ -231,7 +219,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by query") {
sql("drop datamap if exists datamap13")
sql("create datamap datamap13 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap13")
val frame = sql("select sum(utilization) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap13"))
@@ -242,7 +229,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by query with filter on query") {
sql("drop datamap if exists datamap14")
sql("create datamap datamap14 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap14")
val frame = sql(
"select empname,sum(utilization) from fact_table1 group by empname having empname='shivani'")
val analyzed = frame.queryExecution.analyzed
@@ -254,7 +240,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group and sub projection by query with filter on query") {
sql("drop datamap if exists datamap32")
sql("create datamap datamap32 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap32")
val frame = sql(
"select empname, sum(utilization) from fact_table1 group by empname having empname='shivani'")
val analyzed = frame.queryExecution.analyzed
@@ -265,7 +250,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by query with filter on datamap") {
sql("create datamap datamap15 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname")
- sql(s"rebuild datamap datamap15")
val frame = sql(
"select empname,sum(utilization) from fact_table1 where empname='shivani' group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -276,7 +260,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by query with filter on datamap and no filter on query") {
sql("create datamap datamap16 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname")
- sql(s"rebuild datamap datamap16")
val frame = sql("select empname,sum(utilization) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap16"))
@@ -286,7 +269,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and same group by with expression") {
sql("create datamap datamap17 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap17")
val frame = sql(
"select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group" +
" by empname")
@@ -300,7 +282,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by with expression") {
sql("drop datamap if exists datamap18")
sql("create datamap datamap18 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap18")
val frame = sql(
"select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -312,7 +293,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub count group by with expression") {
sql("drop datamap if exists datamap19")
sql("create datamap datamap19 using 'mv' as select empname, count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap19")
val frame = sql(
"select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -324,7 +304,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by with expression and filter on query") {
sql("drop datamap if exists datamap20")
sql("create datamap datamap20 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap20")
val frame = sql(
"select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 where " +
"empname='shivani' group by empname")
@@ -338,7 +317,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join") {
sql("drop datamap if exists datamap21")
sql("create datamap datamap21 using 'mv' as select t1.empname as c1, t2.designation, t2.empname as c2 from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
- sql(s"rebuild datamap datamap21")
val frame = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -350,7 +328,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on query") {
sql("drop datamap if exists datamap22")
sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
- sql(s"rebuild datamap datamap22")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -365,7 +342,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on query and datamap") {
sql("drop datamap if exists datamap23")
sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
- sql(s"rebuild datamap datamap23")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
"t2.empname and t1.empname='shivani'")
@@ -379,7 +355,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple join and filter on datamap and no filter on query") {
sql("drop datamap if exists datamap24")
sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
- sql(s"rebuild datamap datamap24")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -391,7 +366,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with multiple join") {
sql("drop datamap if exists datamap25")
sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)")
- sql(s"rebuild datamap datamap25")
val frame = sql(
"select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
val analyzed = frame.queryExecution.analyzed
@@ -406,7 +380,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with simple join on datamap and multi join on query") {
sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
- sql(s"rebuild datamap datamap26")
val frame = sql(
"select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 " +
"t3 where t1.empname = t2.empname and t1.empname=t3.empname")
@@ -419,7 +392,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with join with group by") {
sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap27")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -433,7 +405,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection") {
sql("drop datamap if exists datamap28")
sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap28")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname group by t2.designation")
@@ -447,7 +418,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with join with group by and sub projection with filter") {
sql("drop datamap if exists datamap29")
sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap29")
val frame = sql(
"select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " +
"t1.empname = t2.empname and t1.empname='shivani' group by t2.designation")
@@ -461,7 +431,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with join with group by with filter") {
sql("drop datamap if exists datamap30")
sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap30")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation")
@@ -475,7 +444,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with expression on projection") {
sql(s"drop datamap if exists datamap31")
sql("create datamap datamap31 using 'mv' as select empname, designation, utilization, projectcode from fact_table1 ")
- sql(s"rebuild datamap datamap31")
val frame = sql(
"select empname, designation, utilization+projectcode from fact_table1")
val analyzed = frame.queryExecution.analyzed
@@ -487,7 +455,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with simple and sub group by query and count agg") {
sql(s"drop datamap if exists datamap32")
sql("create datamap datamap32 using 'mv' as select empname, count(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap32")
val frame = sql("select empname,count(utilization) from fact_table1 where empname='shivani' group by empname")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap32"))
@@ -498,7 +465,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with simple and sub group by query and avg agg") {
sql(s"drop datamap if exists datamap33")
sql("create datamap datamap33 using 'mv' as select empname, avg(utilization) from fact_table1 group by empname")
- sql(s"rebuild datamap datamap33")
val frame = sql("select empname,avg(utilization) from fact_table1 where empname='shivani' group by empname")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap33"))
@@ -509,7 +475,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with left join with group by") {
sql("drop datamap if exists datamap34")
sql("create datamap datamap34 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap34")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
"on t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -522,7 +487,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with simple and group by query with filter on datamap but not on projection") {
sql("create datamap datamap35 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
- sql(s"rebuild datamap datamap35")
val frame = sql(
"select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
val analyzed = frame.queryExecution.analyzed
@@ -533,7 +497,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with simple and sub group by query with filter on datamap but not on projection") {
sql("create datamap datamap36 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
- sql(s"rebuild datamap datamap36")
val frame = sql(
"select sum(utilization) from fact_table1 where empname='shivani' group by designation")
val analyzed = frame.queryExecution.analyzed
@@ -545,7 +508,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with agg push join with sub group by ") {
sql("drop datamap if exists datamap37")
sql("create datamap datamap37 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
- sql(s"rebuild datamap datamap37")
val frame = sql(
"select t1.empname, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname group by t1.empname")
@@ -559,7 +521,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with agg push join with group by ") {
sql("drop datamap if exists datamap38")
sql("create datamap datamap38 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
- sql(s"rebuild datamap datamap38")
val frame = sql(
"select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname group by t1.empname,t1.designation")
@@ -573,7 +534,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with agg push join with group by with filter") {
sql("drop datamap if exists datamap39")
sql("create datamap datamap39 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation ")
- sql(s"rebuild datamap datamap39")
val frame = sql(
"select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation")
@@ -587,7 +547,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with more agg push join with group by with filter") {
sql("drop datamap if exists datamap40")
sql("create datamap datamap40 using 'mv' as select empname, designation, sum(utilization), count(utilization) from fact_table1 group by empname, designation ")
- sql(s"rebuild datamap datamap40")
val frame = sql(
"select t1.empname, t1.designation, sum(t1.utilization),count(t1.utilization) from fact_table1 t1,fact_table2 t2 " +
"where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation")
@@ -601,7 +560,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with left join with group by with filter") {
sql("drop datamap if exists datamap41")
sql("create datamap datamap41 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap41")
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
"on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation")
@@ -615,7 +573,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with left join with sub group by") {
sql("drop datamap if exists datamap42")
sql("create datamap datamap42 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap42")
val frame = sql(
"select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
"on t1.empname = t2.empname group by t1.empname")
@@ -629,7 +586,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with left join with sub group by with filter") {
sql("drop datamap if exists datamap43")
sql("create datamap datamap43 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap43")
val frame = sql(
"select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
"on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")
@@ -643,7 +599,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with left join with sub group by with filter on mv") {
sql("drop datamap if exists datamap44")
sql("create datamap datamap44 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap44")
val frame = sql(
"select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
"on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")
@@ -658,7 +613,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists datamap45")
sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation")
- sql(s"rebuild datamap datamap45")
// During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table
val frame = sql(
"select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " +
@@ -678,7 +632,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql(" insert into test4 select 'babu',12,12").show()
sql("create datamap mv13 using 'mv' as select name,sum(salary) from test4 group by name")
- sql("rebuild datamap mv13")
val frame = sql(
"select name,sum(salary) from test4 group by name")
val analyzed = frame.queryExecution.analyzed
@@ -689,7 +642,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists MV_order")
sql("create datamap MV_order using 'mv' as select empname,sum(salary) as total from fact_table1 group by empname")
- sql("rebuild datamap MV_order")
val frame = sql(
"select empname,sum(salary) as total from fact_table1 group by empname order by empname")
val analyzed = frame.queryExecution.analyzed
@@ -701,7 +653,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists MV_order")
sql("drop datamap if exists MV_desc_order")
sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname")
- sql("rebuild datamap MV_order")
val frame = sql(
"select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname")
val analyzed = frame.queryExecution.analyzed
@@ -712,7 +663,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists MV_order")
sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
- sql("rebuild datamap MV_order")
val frame = sql(
"select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
val analyzed = frame.queryExecution.analyzed
@@ -724,7 +674,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists MV_order")
sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
- sql("rebuild datamap MV_order")
val frame = sql(
"select empname,sum(salary)+sum(utilization) as total from fact_table1 where empname = 'ravi' group by empname order by empname DESC")
val analyzed = frame.queryExecution.analyzed
@@ -739,14 +688,13 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("create table test1( name string,country string,age int,salary int) stored by 'carbondata'")
sql("insert into test1 select 'name1','USA',12,23")
sql("create datamap datamv2 using 'mv' as select country,sum(salary) from test1 group by country")
- sql("rebuild datamap datamv2")
val frame = sql("select country,sum(salary) from test1 where country='USA' group by country")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamv2"))
sql("insert into test1 select 'name1','USA',12,23")
val frame1 = sql("select country,sum(salary) from test1 where country='USA' group by country")
val analyzed1 = frame1.queryExecution.analyzed
- assert(!verifyMVDataMap(analyzed1, "datamv2"))
+ assert(verifyMVDataMap(analyzed1, "datamv2"))
sql("drop datamap if exists datamv2")
sql("drop table if exists test1")
}
@@ -755,7 +703,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists MV_exp")
sql("create datamap MV_exp using 'mv' as select sum(salary),substring(empname,2,5),designation from fact_table1 group by substring(empname,2,5),designation")
- sql("rebuild datamap MV_exp")
val frame = sql(
"select sum(salary),substring(empname,2,5),designation from fact_table1 group by substring(empname,2,5),designation")
val analyzed = frame.queryExecution.analyzed
@@ -776,7 +723,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
sql("drop datamap if exists MV_exp")
sql("create datamap MV_exp using 'mv' as select doj,sum(salary) from xy.fact_tablexy group by doj")
- sql("rebuild datamap MV_exp")
val frame = sql(
"select doj,sum(salary) from xy.fact_tablexy group by doj")
val analyzed = frame.queryExecution.analyzed
@@ -795,7 +741,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql(" insert into mvtable1 select 'n3',12,12")
sql(" insert into mvtable1 select 'n4',12,12")
sql("create datamap map1 using 'mv' as select name,sum(salary) from mvtable1 group by name")
- sql("rebuild datamap map1")
val frame = sql("select name,sum(salary) from mvtable1 group by name limit 1")
val analyzed = frame.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "map1"))
@@ -807,7 +752,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists datamap_comp_maxsumminavg")
sql("create datamap datamap_comp_maxsumminavg using 'mv' as select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
- sql("rebuild datamap datamap_comp_maxsumminavg")
val frame = sql(
"select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -821,7 +765,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
intercept[UnsupportedOperationException] {
sql(
"create datamap mv_unional using 'mv' as Select Z.deptname From (Select deptname,empname From fact_table1 Union All Select deptname,empname from fact_table2) Z")
- sql("rebuild datamap mv_unional")
}
sql("drop datamap if exists mv_unional")
}
@@ -833,7 +776,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"create datamap MV_exp using 'mv' as select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname")
- sql("rebuild datamap MV_exp")
val frame = sql(
"select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -853,7 +795,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
}
sql("show datamap").show()
- sql("rebuild datamap MV_exp1")
val frame = sql(
"select empname, sum(utilization) from fact_table1 group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -866,7 +807,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists datamap46")
sql("create datamap datamap46 using 'mv' as select deptname, sum(salary) from fact_table1 group by deptname")
- sql("rebuild datamap datamap46")
val frame = sql(
"select deptname as babu, sum(salary) from fact_table1 as tt group by deptname")
val analyzed = frame.queryExecution.analyzed
@@ -878,7 +818,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists datamap_subqry")
sql("create datamap datamap_subqry using 'mv' as select empname, min(salary) from fact_table1 group by empname")
- sql("rebuild datamap datamap_subqry")
val frame = sql(
"SELECT max(utilization) FROM fact_table1 WHERE salary IN (select min(salary) from fact_table1 group by empname ) group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -889,10 +828,8 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
test("jira carbondata-2539-1") {
sql("drop datamap if exists datamap_comp_maxsumminavg")
sql("create datamap datamap_comp_maxsumminavg using 'mv' as select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
- sql("rebuild datamap datamap_comp_maxsumminavg")
sql("drop datamap if exists datamap_subqry")
sql("create datamap datamap_subqry using 'mv' as select min(salary) from fact_table1")
- sql("rebuild datamap datamap_subqry")
val frame = sql(
"SELECT max(utilization) FROM fact_table1 WHERE salary IN (select min(salary) from fact_table1) group by empname")
val analyzed = frame.queryExecution.analyzed
@@ -985,7 +922,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop datamap if exists dm_stream_test3")
sql("create datamap dm_stream_test3 using 'mv' as select empname, sum(utilization) from " +
"fact_table1 group by empname")
- sql("rebuild datamap dm_stream_test3")
val exception_tb_mv3: Exception = intercept[Exception] {
sql("alter table fact_table1 set tblproperties('streaming'='true')")
}
@@ -1009,7 +945,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
sql("insert into all_table select 1,1,null,1,1,1,null,1,1,1,1,1,1,1,1,1,1,1,1")
sql("create datamap all_table_mv on table all_table using 'mv' as " + querySQL)
- sql("rebuild datamap all_table_mv")
val frame = sql(querySQL)
val analyzed = frame.queryExecution.analyzed
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
new file mode 100644
index 0000000..bbd7b4c
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -0,0 +1,583 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+
+
+/**
+ * Test Class to verify Incremental Load on MV Datamap
+ */
+
+class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ sql("drop table IF EXISTS test_table")
+ sql("drop table IF EXISTS test_table1")
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS dimensiontable")
+ sql("drop table if exists products")
+ sql("drop table if exists sales")
+ sql("drop table if exists products1")
+ sql("drop table if exists sales1")
+ }
+
+ test("test Incremental Loading on rebuild MV Datamap") {
+ //create table and load data
+ createTableFactTable("test_table")
+ loadDataToFactTable("test_table")
+ createTableFactTable("test_table1")
+ loadDataToFactTable("test_table1")
+ //create datamap on table test_table
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap1 using 'mv' with deferred rebuild as select empname, designation " +
+ "from test_table")
+ val query: String = "select empname from test_table"
+ val df1 = sql(s"$query")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(!verifyMVDataMap(analyzed1, "datamap1"))
+ sql(s"rebuild datamap datamap1")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table"
+ )
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ val df2 = sql(s"$query")
+ val analyzed2 = df2.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed2, "datamap1"))
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ sql(s"rebuild datamap datamap1")
+ loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ segmentList.clear()
+ segmentList.add("1")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ checkAnswer(sql("select empname, designation from test_table"),
+ sql("select empname, designation from test_table1"))
+ val df3 = sql(s"$query")
+ val analyzed3 = df3.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed3, "datamap1"))
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ val df4 = sql(s"$query")
+ val analyzed4 = df4.queryExecution.analyzed
+ assert(!verifyMVDataMap(analyzed4, "datamap1"))
+ checkAnswer(sql("select empname, designation from test_table"),
+ sql("select empname, designation from test_table1"))
+ }
+
+ test("test MV incremental loading with main table having Marked for Delete segments") {
+ createTableFactTable("test_table")
+ loadDataToFactTable("test_table")
+ createTableFactTable("test_table1")
+ loadDataToFactTable("test_table1")
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ sql("Delete from table test_table where segment.id in (0)")
+ sql("Delete from table test_table1 where segment.id in (0)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' with deferred rebuild as select empname, designation " +
+ "from test_table")
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ sql(s"rebuild datamap datamap1")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("1")
+ segmentList.add("2")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ checkAnswer(sql("select empname, designation from test_table"),
+ sql("select empname, designation from test_table1"))
+ dropTable("test_table")
+ dropTable("test_table1")
+ }
+
+ test("test MV incremental loading with update operation on main table") {
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into testtable values('a','abc',1)")
+ sql("insert into testtable values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' with deferred rebuild as select a, sum(b) from main_table group by a")
+ sql(s"rebuild datamap datamap1")
+ var df = sql(
+ s"""select a, sum(b) from main_table group by a""".stripMargin)
+ var analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
+ sql("update testtable set(a) = ('aaa') where b = 'abc'").show(false)
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table"
+ )
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+ checkAnswer(sql("select * from main_table"), sql("select * from testtable"))
+ sql(s"rebuild datamap datamap1")
+ loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ segmentList.add("1")
+ assert(segmentList.containsAll( segmentMap.get("default.main_table")))
+ df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+ analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ }
+
+ test("test compaction on mv datamap table") {
+ createTableFactTable("test_table")
+ loadDataToFactTable("test_table")
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap1 using 'mv' with deferred rebuild as select empname, designation " +
+ "from test_table")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ checkExistence(sql("show segments for table datamap1_table"), false, "0.1")
+ sql("alter datamap datamap1 compact 'major'")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table"
+ )
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(3).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ segmentList.add("1")
+ segmentList.add("2")
+ segmentList.add("3")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ checkExistence(sql("show segments for table datamap1_table"), true, "0.1")
+ sql("clean files for table datamap1_table")
+ sql("drop table IF EXISTS test_table")
+ }
+
+ test("test auto-compaction on mv datamap table") {
+ sql("set carbon.enable.auto.load.merge=true")
+ createTableFactTable("test_table")
+ loadDataToFactTable("test_table")
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap1 using 'mv' with deferred rebuild as select empname, designation " +
+ "from test_table")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ loadDataToFactTable("test_table")
+ sql(s"rebuild datamap datamap1")
+ sql("clean files for table datamap1_table")
+ sql("clean files for table test_table")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(2).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0.1")
+ segmentList.add("4")
+ segmentList.add("5")
+ segmentList.add("6")
+ assert(segmentList.containsAll(segmentMap.get("default.test_table")))
+ dropTable("test_table")
+ }
+
+ test("test insert overwrite") {
+ sql("drop table IF EXISTS test_table")
+ sql("create table test_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into test_table values('a','abc',1)")
+ sql("insert into test_table values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap1 using 'mv' with deferred rebuild as select a, sum(b) from test_table group by a")
+ sql(s"rebuild datamap datamap1")
+ checkAnswer(sql(" select a, sum(b) from test_table group by a"), Seq(Row("a", null), Row("b", null)))
+ sql("insert overwrite table test_table select 'd','abc',3")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+ checkAnswer(sql(" select a, sum(b) from test_table group by a"), Seq(Row("d", null)))
+ sql(s"rebuild datamap datamap1")
+ loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("2")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ sql("drop table IF EXISTS test_table")
+ }
+
+ test("test inner join with mv") {
+ sql("drop table if exists products")
+ sql("create table products (product string, amount int) stored by 'carbondata' ")
+ sql(s"load data INPATH '$resourcesPath/products.csv' into table products")
+ sql("drop table if exists sales")
+ sql("create table sales (product string, quantity int) stored by 'carbondata'")
+ sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales")
+ sql("drop datamap if exists innerjoin")
+ sql("Create datamap innerjoin using 'mv' with deferred rebuild as Select p.product, p.amount, s.quantity, s.product from " +
+ "products p, sales s where p.product=s.product")
+ sql("drop table if exists products1")
+ sql("create table products1 (product string, amount int) stored by 'carbondata' ")
+ sql(s"load data INPATH '$resourcesPath/products.csv' into table products1")
+ sql("drop table if exists sales1")
+ sql("create table sales1 (product string, quantity int) stored by 'carbondata'")
+ sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales1")
+ sql(s"rebuild datamap innerjoin")
+ checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+ sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+ sql("insert into products values('Biscuits',10)")
+ sql("insert into products1 values('Biscuits',10)")
+ sql("rebuild datamap innerjoin")
+ checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+ sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+ sql("insert into sales values('Biscuits',100)")
+ sql("insert into sales1 values('Biscuits',100)")
+ checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+ sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+ }
+
+ test("test set segments with main table having mv datamap") {
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS test_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("create table test_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into test_table values('a','abc',1)")
+ sql("insert into test_table values('b','bcd',2)")
+ sql("drop datamap if exists datamap_mt")
+ sql(
+ "create datamap datamap_mt using 'mv' with deferred rebuild as select a, sum(b) from main_table group by a")
+ sql(s"rebuild datamap datamap_mt")
+ checkAnswer(sql("select a, sum(b) from main_table group by a"),
+ sql("select a, sum(b) from test_table group by a"))
+ sql("SET carbon.input.segments.default.main_table = 1")
+ sql("SET carbon.input.segments.default.test_table=1")
+ checkAnswer(sql("select a, sum(b) from main_table group by a"),
+ sql("select a, sum(b) from test_table group by a"))
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS test_table")
+ }
+
+
+ test("test set segments with main table having mv datamap before rebuild") {
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' with deferred rebuild as select a, sum(c) from main_table group by a")
+ sql("SET carbon.input.segments.default.main_table=1")
+ sql(s"rebuild datamap datamap1")
+ val df = sql("select a, sum(c) from main_table group by a")
+ val analyzed = df.queryExecution.analyzed
+ assert(!verifyMVDataMap(analyzed, "datamap1"))
+ sql("reset")
+ checkAnswer(sql("select a, sum(c) from main_table group by a"), Seq(Row("a", 1), Row("b", 2)))
+ val df1= sql("select a, sum(c) from main_table group by a")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed1, "datamap1"))
+ sql("drop table IF EXISTS main_table")
+ }
+
+ test("test datamap table after datamap table compaction- custom") {
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' with deferred rebuild as select a, sum(b) from main_table group by a")
+ sql(s"rebuild datamap datamap1")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql(s"rebuild datamap datamap1")
+ sql("alter datamap datamap1 compact 'custom' where segment.id in (0,1)")
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.COMPACTED)
+ assert(loadMetadataDetails(1).getSegmentStatus == SegmentStatus.COMPACTED)
+ var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(2).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ segmentList.add("1")
+ segmentList.add("2")
+ segmentList.add("3")
+ assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+ sql("drop table IF EXISTS main_table")
+ }
+
+ test("test sum(a) + sum(b)") {
+ // Full rebuild will happen in this case
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a int,b int,c int) stored by 'carbondata'")
+ sql("insert into main_table values(1,2,3)")
+ sql("insert into main_table values(1,4,5)")
+ sql("drop datamap if exists datamap_1")
+ sql("create datamap datamap_1 using 'mv' with deferred rebuild as select sum(a)+sum(b) from main_table")
+ checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(8)))
+ sql("rebuild datamap datamap_1")
+ checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(8)))
+ sql("insert into main_table values(1,2,3)")
+ sql("insert into main_table values(1,4,5)")
+ checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(16)))
+ sql("rebuild datamap datamap_1")
+ checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(16)))
+ sql("drop table IF EXISTS main_table")
+ }
+
+ test("test Incremental Loading on non-lazy mv Datamap") {
+ //create table and load data
+ createTableFactTable("test_table")
+ loadDataToFactTable("test_table")
+ createTableFactTable("test_table1")
+ loadDataToFactTable("test_table1")
+ //create datamap on table test_table
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap1 using 'mv' as select empname, designation " +
+ "from test_table")
+ val query: String = "select empname from test_table"
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table"
+ )
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ val df2 = sql(s"$query")
+ val analyzed2 = df2.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed2, "datamap1"))
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ segmentList.clear()
+ segmentList.add("1")
+ assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+ checkAnswer(sql("select empname, designation from test_table"),
+ sql("select empname, designation from test_table1"))
+ val df3 = sql(s"$query")
+ val analyzed3 = df3.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed3, "datamap1"))
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ val df4 = sql(s"$query")
+ val analyzed4 = df4.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed4, "datamap1"))
+ checkAnswer(sql("select empname, designation from test_table"),
+ sql("select empname, designation from test_table1"))
+ }
+
+ test("test MV incremental loading on non-lazy datamap with update operation on main table") {
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into testtable values('a','abc',1)")
+ sql("insert into testtable values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+ var df = sql(s"""select a, sum(b) from main_table group by a""".stripMargin)
+ var analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
+ sql("update testtable set(a) = ('aaa') where b = 'abc'").show(false)
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+ var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ segmentList.add("1")
+ assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+ df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+ analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ }
+
+ test("test MV incremental loading on non-lazy datamap with delete operation on main table") {
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into testtable values('a','abc',1)")
+ sql("insert into testtable values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+ var df = sql(s"""select a, sum(b) from main_table group by a""".stripMargin)
+ var analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("delete from main_table where b = 'abc'").show(false)
+ sql("delete from testtable where b = 'abc'").show(false)
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+ var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+ val segmentList = new java.util.ArrayList[String]()
+ segmentList.add("0")
+ segmentList.add("1")
+ assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+ df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+ analyzed = df.queryExecution.analyzed
+ assert(verifyMVDataMap(analyzed, "datamap1"))
+ checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+ sql(" select a, sum(b) from main_table group by a"))
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS testtable")
+ }
+
+ test("test whether datamap table is compacted after main table compaction") {
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("insert into main_table values('b','bcd',2)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+ sql("insert into main_table values('c','abc',1)")
+ sql("insert into main_table values('d','bcd',2)")
+ sql("alter table main_table compact 'major'")
+ checkExistence(sql("show segments for table main_table"), true, "0.1")
+ checkExistence(sql("show segments for table datamap1_table"), true, "0.1")
+ sql("drop table IF EXISTS main_table")
+ }
+
+ test("test delete record when table contains single segment") {
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+ sql("delete from main_table where b = 'abc'").show(false)
+ val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+ "datamap1_table")
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+ assert(loadMetadataDetails.length == 1)
+ assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+ }
+
+ test("set segments on datamap table") {
+ sql("drop table IF EXISTS main_table")
+ sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+ sql("insert into main_table values('a','abc',1)")
+ sql("drop datamap if exists datamap1")
+ sql("create datamap datamap1 using 'mv' as select a,b from main_table")
+ sql("insert into main_table values('b','abcd',1)")
+ sql("SET carbon.input.segments.default.datamap1_table=0")
+ assert(sql("select a,b from main_table").count() == 1)
+ sql("drop table IF EXISTS main_table")
+ }
+
+ def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+ val tables = logicalPlan collect {
+ case l: LogicalRelation => l.catalogTable.get
+ }
+ tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop table if exists products")
+ sql("drop table if exists sales")
+ sql("drop table if exists products1")
+ sql("drop table if exists sales1")
+ sql("drop table IF EXISTS test_table")
+ sql("drop table IF EXISTS test_table1")
+ sql("drop table IF EXISTS main_table")
+ sql("drop table IF EXISTS dimensiontable")
+ }
+
+ private def createTableFactTable(tableName: String) = {
+ sql(s"drop table IF EXISTS $tableName")
+ sql(
+ s"""
+ | CREATE TABLE $tableName (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ }
+
+ private def loadDataToFactTable(tableName: String) = {
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE $tableName OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ }
+}
\ No newline at end of file
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
index 6068ef5..1747e51 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
@@ -83,7 +83,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_1") {
sql(s"drop datamap if exists datamap_sm1")
sql(s"create datamap datamap_sm1 using 'mv' as ${sampleTestCases(0)._2}")
- sql(s"rebuild datamap datamap_sm1")
val df = sql(sampleTestCases(0)._3)
val analyzed = df.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap_sm1"))
@@ -93,7 +92,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_3") {
sql(s"drop datamap if exists datamap_sm2")
sql(s"create datamap datamap_sm2 using 'mv' as ${sampleTestCases(2)._2}")
- sql(s"rebuild datamap datamap_sm2")
val df = sql(sampleTestCases(2)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm2"))
@@ -103,7 +101,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_4") {
sql(s"drop datamap if exists datamap_sm3")
sql(s"create datamap datamap_sm3 using 'mv' as ${sampleTestCases(3)._2}")
- sql(s"rebuild datamap datamap_sm3")
val df = sql(sampleTestCases(3)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm3"))
@@ -113,7 +110,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_5") {
sql(s"drop datamap if exists datamap_sm4")
sql(s"create datamap datamap_sm4 using 'mv' as ${sampleTestCases(4)._2}")
- sql(s"rebuild datamap datamap_sm4")
val df = sql(sampleTestCases(4)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm4"))
@@ -123,7 +119,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_6") {
sql(s"drop datamap if exists datamap_sm5")
sql(s"create datamap datamap_sm5 using 'mv' as ${sampleTestCases(5)._2}")
- sql(s"rebuild datamap datamap_sm5")
val df = sql(sampleTestCases(5)._3)
val analyzed = df.queryExecution.analyzed
assert(!verifyMVDataMap(analyzed, "datamap_sm5"))
@@ -133,7 +128,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_7") {
sql(s"drop datamap if exists datamap_sm6")
sql(s"create datamap datamap_sm6 using 'mv' as ${sampleTestCases(6)._2}")
- sql(s"rebuild datamap datamap_sm6")
val df = sql(sampleTestCases(6)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm6"))
@@ -143,7 +137,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_8") {
sql(s"drop datamap if exists datamap_sm7")
sql(s"create datamap datamap_sm7 using 'mv' as ${sampleTestCases(7)._2}")
- sql(s"rebuild datamap datamap_sm7")
val df = sql(sampleTestCases(7)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm7"))
@@ -153,7 +146,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with sampleTestCases case_9") {
sql(s"drop datamap if exists datamap_sm8")
sql(s"create datamap datamap_sm8 using 'mv' as ${sampleTestCases(8)._2}")
- sql(s"rebuild datamap datamap_sm8")
val df = sql(sampleTestCases(8)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_sm8"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
index b2d03e1..7304527 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
@@ -51,7 +51,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_1") {
sql(s"drop datamap if exists datamap_tpcds1")
sql(s"create datamap datamap_tpcds1 using 'mv' as ${tpcds_1_4_testCases(0)._2}")
- sql(s"rebuild datamap datamap_tpcds1")
val df = sql(tpcds_1_4_testCases(0)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds1"))
@@ -61,7 +60,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_3") {
sql(s"drop datamap if exists datamap_tpcds3")
sql(s"create datamap datamap_tpcds3 using 'mv' as ${tpcds_1_4_testCases(2)._2}")
- sql(s"rebuild datamap datamap_tpcds3")
val df = sql(tpcds_1_4_testCases(2)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds3"))
@@ -71,7 +69,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_4") {
sql(s"drop datamap if exists datamap_tpcds4")
sql(s"create datamap datamap_tpcds4 using 'mv' as ${tpcds_1_4_testCases(3)._2}")
- sql(s"rebuild datamap datamap_tpcds4")
val df = sql(tpcds_1_4_testCases(3)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds4"))
@@ -81,7 +78,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_5") {
sql(s"drop datamap if exists datamap_tpcds5")
sql(s"create datamap datamap_tpcds5 using 'mv' as ${tpcds_1_4_testCases(4)._2}")
- sql(s"rebuild datamap datamap_tpcds5")
val df = sql(tpcds_1_4_testCases(4)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds5"))
@@ -91,7 +87,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_6") {
sql(s"drop datamap if exists datamap_tpcds6")
sql(s"create datamap datamap_tpcds6 using 'mv' as ${tpcds_1_4_testCases(5)._2}")
- sql(s"rebuild datamap datamap_tpcds6")
val df = sql(tpcds_1_4_testCases(5)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds6"))
@@ -101,7 +96,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_8") {
sql(s"drop datamap if exists datamap_tpcds8")
sql(s"create datamap datamap_tpcds8 using 'mv' as ${tpcds_1_4_testCases(7)._2}")
- sql(s"rebuild datamap datamap_tpcds8")
val df = sql(tpcds_1_4_testCases(7)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds8"))
@@ -111,7 +105,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_11") {
sql(s"drop datamap if exists datamap_tpcds11")
sql(s"create datamap datamap_tpcds11 using 'mv' as ${tpcds_1_4_testCases(10)._2}")
- sql(s"rebuild datamap datamap_tpcds11")
val df = sql(tpcds_1_4_testCases(10)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds11"))
@@ -121,7 +114,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_15") {
sql(s"drop datamap if exists datamap_tpcds15")
sql(s"create datamap datamap_tpcds15 using 'mv' as ${tpcds_1_4_testCases(14)._2}")
- sql(s"rebuild datamap datamap_tpcds15")
val df = sql(tpcds_1_4_testCases(14)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds15"))
@@ -131,7 +123,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpcds_1_4_testCases case_16") {
sql(s"drop datamap if exists datamap_tpcds16")
sql(s"create datamap datamap_tpcds16 using 'mv' as ${tpcds_1_4_testCases(15)._2}")
- sql(s"rebuild datamap datamap_tpcds16")
val df = sql(tpcds_1_4_testCases(15)._3)
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap_tpcds16"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index ff5bdac..5788a23 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -71,7 +71,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch1") {
sql(s"drop datamap if exists datamap1")
sql("create datamap datamap1 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem group by l_returnflag, l_linestatus,l_shipdate")
- sql(s"rebuild datamap datamap1")
val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap1"))
@@ -82,7 +81,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch1 with order") {
sql(s"drop datamap if exists datamap2")
sql("create datamap datamap2 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate order by l_returnflag, l_linestatus")
- sql(s"rebuild datamap datamap2")
val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap2"))
@@ -93,7 +91,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch1 with sub group by") {
sql(s"drop datamap if exists datamap3")
sql("create datamap datamap3 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate")
- sql(s"rebuild datamap datamap3")
val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap3"))
@@ -104,7 +101,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpch3") {
sql(s"drop datamap if exists datamap4")
sql("create datamap datamap4 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority")
- sql(s"rebuild datamap datamap4")
val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap4"))
@@ -115,7 +111,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch3 with no filters on mv") {
sql(s"drop datamap if exists datamap5")
sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
- sql(s"rebuild datamap datamap5")
val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -126,7 +121,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpch3 with filters on mv and all filter columns on projection") {
sql(s"drop datamap if exists datamap5")
sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
- sql(s"rebuild datamap datamap5")
val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -137,7 +131,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpch4 (core issue)") {
sql(s"drop datamap if exists datamap6")
sql("create datamap datamap6 using 'mv' as select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority")
- sql(s"rebuild datamap datamap6")
val df = sql("select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap6"))
@@ -148,7 +141,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpch5") {
sql(s"drop datamap if exists datamap7")
sql("create datamap datamap7 using 'mv' as select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name")
- sql(s"rebuild datamap datamap7")
val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap7"))
@@ -159,7 +151,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch5 with no filters on mv") {
sql(s"drop datamap if exists datamap8")
sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey group by n_name,o_orderdate,r_name")
- sql(s"rebuild datamap datamap8")
val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap8"))
@@ -170,7 +161,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch6") {
sql(s"drop datamap if exists datamap9")
sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
- sql(s"rebuild datamap datamap9")
val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -182,7 +172,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch6 with no filters on mv") {
sql(s"drop datamap if exists datamap10")
sql("create datamap datamap10 using 'mv' as select sum(l_extendedprice * l_discount) as revenue,l_shipdate,l_discount,l_quantity from lineitem group by l_shipdate,l_discount,l_quantity")
- sql(s"rebuild datamap datamap10")
val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap10"))
@@ -194,7 +183,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch7 part of query1") {
sql(s"drop datamap if exists datamap11")
sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
- sql(s"rebuild datamap datamap11")
val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')")
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap11"))
@@ -205,7 +193,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
test("test create datamap with tpch7 part of query2 (core issue)") {
sql(s"drop datamap if exists datamap12")
sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
- sql(s"rebuild datamap datamap12")
val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('19 [...]
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap12"))
@@ -216,7 +203,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
ignore("test create datamap with tpch7 part of query3 (self join issue)") {
sql(s"drop datamap if exists datamap13")
sql("create datamap datamap13 using 'mv' as select n1.n_name as supp_nation, n2.n_name as cust_nation, l_shipdate, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey")
- sql(s"rebuild datamap datamap13")
val df = sql("select supp_nation, cust_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_ [...]
val analyzed = df.queryExecution.analyzed
assert(verifyMVDataMap(analyzed, "datamap13"))
diff --git a/integration/spark-common-test/src/test/resources/products.csv b/integration/spark-common-test/src/test/resources/products.csv
new file mode 100644
index 0000000..3d0c6a7
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/products.csv
@@ -0,0 +1,5 @@
+product,amount
+Mobile,2000
+Laptop,3000
+Kettle,70
+Washing Machine,1000
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/sales_data.csv b/integration/spark-common-test/src/test/resources/sales_data.csv
new file mode 100644
index 0000000..c80b9e5
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sales_data.csv
@@ -0,0 +1,5 @@
+product,quantity
+Mobile,1
+Laptop,10
+Chocolates,200
+Biscuits,800
\ No newline at end of file
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 8e1f22a..f6031d1 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.datamap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.exceptions.MetadataProcessException;
@@ -98,8 +99,9 @@ public class IndexDataMapProvider extends DataMapProvider {
}
@Override
- public void rebuild() {
+ public boolean rebuild() {
IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema());
+ return true;
}
private DataMapFactory<? extends DataMap> createDataMapFactory()
@@ -132,4 +134,9 @@ public class IndexDataMapProvider extends DataMapProvider {
public boolean supportRebuild() {
return dataMapFactory.supportRebuild();
}
+
+ @Override
+ public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
+ return false;
+ }
}
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 233c41f..72390ce 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.datamap;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -94,10 +95,11 @@ public class PreAggregateDataMapProvider extends DataMapProvider {
}
@Override
- public void rebuild() {
+ public boolean rebuild() {
if (helper != null) {
helper.initData(sparkSession);
}
+ return true;
}
@Override
@@ -109,4 +111,9 @@ public class PreAggregateDataMapProvider extends DataMapProvider {
public boolean supportRebuild() {
return false;
}
+
+ @Override
+ public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
+ return false;
+ }
}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 82c893f..682e76c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -953,6 +953,15 @@ object CarbonDataRDDFactory {
throw new Exception(errorMessage)
} else {
DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
+ if (overwriteTable) {
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap).asJava
+ if (!allDataMapSchemas.isEmpty) {
+ DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+ }
+ }
}
(done, metadataDetails)
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 149e45e..d9dec68 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache._
+import org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener, LoadPostDataMapListener}
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
@@ -37,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -173,6 +174,8 @@ object CarbonEnv {
.addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
.addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
AlterPreAggregateTableCompactionPostListener)
+ .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
+ AlterDataMaptableCompactionPostListener)
.addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener)
.addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener)
.addListener(classOf[LoadTablePostStatusUpdateEvent], CommitPreAggregateListener)
@@ -183,6 +186,9 @@ object CarbonEnv {
AlterTableDropPartitionPostStatusListener)
.addListener(classOf[AlterTableDropPartitionMetaEvent], AlterTableDropPartitionMetaListener)
.addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeIndexEventListener)
+ .addListener(classOf[LoadTablePostExecutionEvent], LoadPostDataMapListener)
+ .addListener(classOf[UpdateTablePostEvent], LoadPostDataMapListener )
+ .addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 271a19b..d9a6490 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -87,13 +87,6 @@ case class CarbonCreateDataMapCommand(
val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava
val javaMap = new java.util.HashMap[String, String](property)
- // for MV, it is deferred rebuild by default and cannot be non-deferred rebuild
- if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.MV.getShortName)) {
- if (!deferredRebuild) {
- LOGGER.warn(s"DEFERRED REBUILD is enabled by default for MV datamap $dataMapName")
- }
- deferredRebuild = true
- }
javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
dataMapSchema.setProperties(javaMap)
@@ -180,6 +173,10 @@ case class CarbonCreateDataMapCommand(
operationContext)
}
}
+ if (null != dataMapSchema.getRelationIdentifier && !dataMapSchema.isIndexDataMap &&
+ !dataMapSchema.isLazy) {
+ DataMapStatusManager.enableDataMap(dataMapName)
+ }
}
Seq.empty
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index 267fedd..345c5a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.execution.command.DataCommand
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _}
@@ -50,7 +52,8 @@ case class CarbonDataMapRebuildCommand(
}
}
val schema = schemaOption.get
- if (!schema.isLazy) {
+ if (!schema.isLazy &&
+ (schema.isIndexDataMap || schema.isInstanceOf[AggregationDataMapSchema])) {
throw new MalformedDataMapCommandException(
s"Non-lazy datamap $dataMapName does not support rebuild")
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 9d8bf90..ee8e8f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapUtil
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
@@ -320,6 +322,7 @@ case class CarbonAlterTableCompactionCommand(
throw e
} finally {
updateLock.unlock()
+ DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
}
}
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 4b5b4b6..c142398 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events._
@@ -70,6 +71,19 @@ case class CarbonCleanFilesCommand(
isInternalCleanCall = true)
}.toList
cleanFileCommands.foreach(_.processMetadata(sparkSession))
+ } else if (CarbonTable.hasMVDataMap(carbonTable)) {
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap)
+ cleanFileCommands = allDataMapSchemas.map {
+ dataMapSchema =>
+ val relationIdentifier = dataMapSchema.getRelationIdentifier
+ CarbonCleanFilesCommand(
+ Some(relationIdentifier.getDatabaseName), Some(relationIdentifier.getTableName),
+ isInternalCleanCall = true)
+ }.toList
+ cleanFileCommands.foreach(_.processMetadata(sparkSession))
} else if (carbonTable.isChildDataMap && !isInternalCleanCall) {
throwMetadataException(
carbonTable.getDatabaseName, carbonTable.getTableName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 8b8fe0d..3521c97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -17,12 +17,16 @@
package org.apache.spark.sql.execution.command.mutation
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -118,6 +122,14 @@ private[sql] case class CarbonProjectForDeleteCommand(
throw new Exception(executorErrors.errorMsg)
}
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap).asJava
+ if (!allDataMapSchemas.isEmpty) {
+ DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+ }
+
// trigger post event for Delete from table
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
DeleteFromTablePostEvent(sparkSession, carbonTable)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 9fbf745..20a8435 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.command.mutation
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command._
@@ -28,7 +30,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -171,6 +174,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, carbonTable, isUpdateOperation = true)
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap).asJava
+ if (!allDataMapSchemas.isEmpty) {
+ DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+ }
// trigger event for Update table
val updateTablePostEvent: UpdateTablePostEvent =
UpdateTablePostEvent(sparkSession, carbonTable)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
new file mode 100644
index 0000000..f94b73a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.mv
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.command.AlterTableModel
+import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.events.{
+ AlterTableCompactionPreStatusUpdateEvent,
+ DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener, UpdateTablePostEvent
+}
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
+import org.apache.carbondata.processing.merger.CompactionType
+
+/**
+ * Listener to trigger compaction on mv datamap after main table compaction
+ */
+object AlterDataMaptableCompactionPostListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
+ val carbonTable = compactionEvent.carbonTable
+ val compactionType = compactionEvent.carbonMergerMapping.campactionType
+ if (compactionType == CompactionType.CUSTOM) {
+ return
+ }
+ val carbonLoadModel = compactionEvent.carbonLoadModel
+ val sparkSession = compactionEvent.sparkSession
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap)
+ if (!allDataMapSchemas.asJava.isEmpty) {
+ allDataMapSchemas.foreach { dataMapSchema =>
+ val childRelationIdentifier = dataMapSchema.getRelationIdentifier
+ val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
+ childRelationIdentifier.getTableName,
+ None,
+ compactionType.toString,
+ Some(System.currentTimeMillis()),
+ "")
+ operationContext.setProperty(
+ dataMapSchema.getRelationIdentifier.getDatabaseName + "_" +
+ dataMapSchema.getRelationIdentifier.getTableName + "_Segment",
+ carbonLoadModel.getSegmentId)
+ CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
+ .run(sparkSession)
+ }
+ }
+ }
+}
+
+/**
+ * Listener to trigger data load on mv datamap after main table data load
+ */
+object LoadPostDataMapListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val sparkSession = SparkSession.getActiveSession.get
+ val carbonTable: CarbonTable =
+ event match {
+ case event: LoadTablePostExecutionEvent =>
+ val carbonLoadModelOption = Some(event.getCarbonLoadModel)
+ if (carbonLoadModelOption.isDefined) {
+ val carbonLoadModel = carbonLoadModelOption.get
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ } else {
+ null
+ }
+ case event: UpdateTablePostEvent =>
+ val table = Some(event.carbonTable)
+ if (table.isDefined) {
+ table.get
+ } else {
+ null
+ }
+ case event: DeleteFromTablePostEvent =>
+ val table = Some(event.carbonTable)
+ if (table.isDefined) {
+ table.get
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (null != carbonTable) {
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap)
+ if (!allDataMapSchemas.asJava.isEmpty) {
+ allDataMapSchemas.foreach { dataMapSchema =>
+ if (!dataMapSchema.isLazy) {
+ val provider = DataMapManager.get()
+ .getDataMapProvider(carbonTable, dataMapSchema, sparkSession)
+ try {
+ provider.rebuild()
+ } catch {
+ case ex: Exception =>
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ throw ex
+ }
+ DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 62e2d85..d875fdf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -35,7 +35,9 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
@@ -69,10 +71,22 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
"Update operation is not supported for pre-aggregate table")
}
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (!indexSchemas.isEmpty) {
+ if (CarbonTable.hasMVDataMap(carbonTable)) {
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap).asJava
+ allDataMapSchemas.asScala.foreach { dataMapSchema =>
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ }
+ } else if (!indexSchemas.isEmpty) {
throw new UnsupportedOperationException(
"Update operation is not supported for table which has index datamaps")
}
+ if (carbonTable.isChildTable) {
+ throw new UnsupportedOperationException(
+ "Update operation is not supported for mv datamap table")
+ }
}
val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
relation
@@ -200,7 +214,15 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
"Delete operation is not supported for pre-aggregate table")
}
val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
- if (!indexSchemas.isEmpty) {
+ if (CarbonTable.hasMVDataMap(carbonTable)) {
+ val allDataMapSchemas = DataMapStoreManager.getInstance
+ .getDataMapSchemasOfTable(carbonTable).asScala
+ .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+ !dataMapSchema.isIndexDataMap).asJava
+ allDataMapSchemas.asScala.foreach { dataMapSchema =>
+ DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+ }
+ } else if (!indexSchemas.isEmpty) {
throw new UnsupportedOperationException(
"Delete operation is not supported for table which has index datamaps")
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 5f5cc12..1c3b7cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -77,7 +77,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val startCommand: Parser[LogicalPlan] =
loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
- alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli | cacheManagement
+ alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli |
+ cacheManagement | alterDataMap
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -246,6 +247,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
CarbonDataMapRebuildCommand(datamap, tableIdent)
}
+ protected lazy val alterDataMap: Parser[LogicalPlan] =
+ ALTER ~> DATAMAP ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
+ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+ opt(";") ^^ {
+ case dbName ~ datamap ~ (compact ~ compactType) ~ segs =>
+ val altertablemodel =
+ AlterTableModel(convertDbNameToLowerCase(dbName), datamap + "_table", None, compactType,
+ Some(System.currentTimeMillis()), null, segs)
+ CarbonAlterTableCompactionCommand(altertablemodel)
+ }
+
protected lazy val deleteRecords: Parser[LogicalPlan] =
(DELETE ~> FROM ~> aliasTable) ~ restInput.? <~ opt(";") ^^ {
case table ~ rest =>
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 01e47be..1d51592 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -35,15 +35,19 @@ import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
@@ -299,7 +303,8 @@ public final class CarbonDataMergerUtil {
*/
public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
- CompactionType compactionType, String segmentFile) throws IOException {
+ CompactionType compactionType, String segmentFile) throws IOException,
+ NoSuchDataMapException {
boolean tableStatusUpdationStatus = false;
AbsoluteTableIdentifier identifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -351,6 +356,23 @@ public final class CarbonDataMergerUtil {
loadMetadataDetails.setMajorCompacted("true");
}
+ if (carbonTable.isChildTable()) {
+ // If table is child table, then get segment mapping and set to extraInfo
+ DataMapSchema dataMapSchema = null;
+ try {
+ dataMapSchema = DataMapStatusManager.getDataMapSchema(
+ carbonTable.getTableInfo().getFactTable().getTableProperties()
+ .get(CarbonCommonConstants.DATAMAP_NAME));
+ } catch (NoSuchDataMapException e) {
+ throw e;
+ }
+ if (null != dataMapSchema) {
+ String segmentMap = DataMapSegmentStatusUtil
+ .getUpdatedSegmentMap(mergedLoadNumber, dataMapSchema, loadDetails);
+ loadMetadataDetails.setExtraInfo(segmentMap);
+ }
+ }
+
List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
// put the merged folder entry
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index a396978..3769671 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,15 +263,26 @@ public final class CarbonLoaderUtil {
String segmentId =
String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
+ LoadMetadataDetails entryTobeRemoved = null;
// Segment id would be provided in case this is compaction flow for aggregate data map.
// If that is true then used the segment id as the load name.
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !loadModel
.getSegmentId().isEmpty()) {
newMetaEntry.setLoadName(loadModel.getSegmentId());
+ } else if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildTable()
+ && !loadModel.getSegmentId().isEmpty()) {
+ for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+ if (entry.getLoadName().equalsIgnoreCase(loadModel.getSegmentId())) {
+ newMetaEntry.setLoadName(loadModel.getSegmentId());
+ newMetaEntry.setExtraInfo(entry.getExtraInfo());
+ entryTobeRemoved = entry;
+ }
+ }
} else {
newMetaEntry.setLoadName(segmentId);
loadModel.setSegmentId(segmentId);
}
+ listOfLoadFolderDetails.remove(entryTobeRemoved);
// Exception should be thrown if:
// 1. If insert overwrite is in progress and any other load or insert operation
// is triggered
@@ -299,6 +310,7 @@ public final class CarbonLoaderUtil {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equals(newMetaEntry.getLoadName())
&& entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+ newMetaEntry.setExtraInfo(entry.getExtraInfo());
found = true;
break;
}