You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 19:05:41 UTC
[carbondata] 08/22: [CARBONDATA-3348] Support alter SORT_COLUMNS
property
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit d1b455f09590b48a4ba3709fa29635a18da1d790
Author: QiangCai <qi...@qq.com>
AuthorDate: Tue Apr 16 20:27:31 2019 +0800
[CARBONDATA-3348] Support alter SORT_COLUMNS property
Modification
support alter SORT_COLUMNS
alter table <table name> set tblproperties('sort_scope'='<sort scope type>', 'sort_columns'='[c1][,...cn ]')
Limitation
when a measure become a dimension and the query contain this column, the task distribution of this query will only support block and blocklet, but not merge_small_files or custom.
This closes #3178
---
.../core/constants/CarbonCommonConstants.java | 5 +
.../carbondata/core/datamap/DataMapFilter.java | 89 ++++
.../carbondata/core/datamap/TableDataMap.java | 91 ++--
.../datamap/dev/expr/DataMapExprWrapperImpl.java | 3 +-
.../core/metadata/schema/table/CarbonTable.java | 20 +
.../core/metadata/schema/table/TableInfo.java | 23 +
.../scan/executor/impl/AbstractQueryExecutor.java | 62 +--
.../executor/impl/QueryExecutorProperties.java | 5 -
.../core/scan/executor/util/RestructureUtil.java | 75 ++-
.../core/scan/model/QueryModelBuilder.java | 2 +-
.../scan/executor/util/RestructureUtilTest.java | 11 +-
.../carbondata/hadoop/api/CarbonInputFormat.java | 29 +-
.../test/resources/sort_columns/alldatatype1.csv | 13 +
.../test/resources/sort_columns/alldatatype2.csv | 13 +
.../TestAlterTableSortColumnsProperty.scala | 541 +++++++++++++++++++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 10 +-
.../apache/carbondata/spark/util/CommonUtil.scala | 80 ++-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 31 +-
.../org/apache/spark/util/AlterTableUtil.scala | 126 ++++-
19 files changed, 1039 insertions(+), 190 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c9efc34..608b5fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -478,6 +478,11 @@ public final class CarbonCommonConstants {
*/
public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
+ /**
+ * column level property: the measure is changed to the dimension
+ */
+ public static final String COLUMN_DRIFT = "column_drift";
+
//////////////////////////////////////////////////////////////////////////////////////////
// Data loading parameter start here
//////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
new file mode 100644
index 0000000..c20d0d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * the filter of DataMap
+ */
+public class DataMapFilter implements Serializable {
+
+ private CarbonTable table;
+
+ private Expression expression;
+
+ private FilterResolverIntf resolver;
+
+ public DataMapFilter(CarbonTable table, Expression expression) {
+ this.table = table;
+ this.expression = expression;
+ resolve();
+ }
+
+ public DataMapFilter(FilterResolverIntf resolver) {
+ this.resolver = resolver;
+ }
+
+ private void resolve() {
+ if (expression != null) {
+ table.processFilterExpression(expression, null, null);
+ resolver = CarbonTable.resolveFilter(expression, table.getAbsoluteTableIdentifier());
+ }
+ }
+
+ public Expression getExpression() {
+ return expression;
+ }
+
+ public void setExpression(Expression expression) {
+ this.expression = expression;
+ }
+
+ public FilterResolverIntf getResolver() {
+ return resolver;
+ }
+
+ public void setResolver(FilterResolverIntf resolver) {
+ this.resolver = resolver;
+ }
+
+ public boolean isEmpty() {
+ return resolver == null;
+ }
+
+ public boolean isResolvedOnSegment(SegmentProperties segmentProperties) {
+ if (expression == null || table == null) {
+ return true;
+ }
+ if (!table.isTransactionalTable()) {
+ return false;
+ }
+ if (table.hasColumnDrift() && RestructureUtil
+ .hasColumnDriftOnSegment(table, segmentProperties)) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index f9020bd..4375abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.events.Event;
@@ -100,38 +99,6 @@ public final class TableDataMap extends OperationEventListener {
return blockletDetailsFetcher;
}
-
- /**
- * Pass the valid segments and prune the datamap using filter expression
- *
- * @param segments
- * @param filterExp
- * @return
- */
- public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp,
- List<PartitionSpec> partitions) throws IOException {
- List<ExtendedBlocklet> blocklets = new ArrayList<>();
- SegmentProperties segmentProperties;
- Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
- for (Segment segment : segments) {
- List<Blocklet> pruneBlocklets = new ArrayList<>();
- // if filter is not passed then return all the blocklets
- if (filterExp == null) {
- pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
- } else {
- segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
- for (DataMap dataMap : dataMaps.get(segment)) {
- pruneBlocklets.addAll(dataMap
- .prune(filterExp, segmentProperties, partitions, table));
- }
- }
- blocklets.addAll(addSegmentId(
- blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
- segment));
- }
- return blocklets;
- }
-
public CarbonTable getTable() {
return table;
}
@@ -140,10 +107,10 @@ public final class TableDataMap extends OperationEventListener {
* Pass the valid segments and prune the datamap using filter expression
*
* @param segments
- * @param filterExp
+ * @param filter
* @return
*/
- public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp,
+ public List<ExtendedBlocklet> prune(List<Segment> segments, final DataMapFilter filter,
final List<PartitionSpec> partitions) throws IOException {
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
@@ -164,15 +131,15 @@ public final class TableDataMap extends OperationEventListener {
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple concurrent queries.
- if (filterExp == null) {
+ if (filter.isEmpty()) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
- return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
+ return pruneWithFilter(segments, filter, partitions, blocklets, dataMaps);
}
// handle by multi-thread
- List<ExtendedBlocklet> extendedBlocklets =
- pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
+ List<ExtendedBlocklet> extendedBlocklets = pruneMultiThread(
+ segments, filter, partitions, blocklets, dataMaps, totalFiles);
return extendedBlocklets;
}
@@ -187,14 +154,22 @@ public final class TableDataMap extends OperationEventListener {
return blocklets;
}
- private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
- FilterResolverIntf filterExp, List<PartitionSpec> partitions,
- List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException {
+ private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments, DataMapFilter filter,
+ List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
+ Map<Segment, List<DataMap>> dataMaps) throws IOException {
for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
- for (DataMap dataMap : dataMaps.get(segment)) {
- pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+ if (filter.isResolvedOnSegment(segmentProperties)) {
+ for (DataMap dataMap : dataMaps.get(segment)) {
+ pruneBlocklets.addAll(
+ dataMap.prune(filter.getResolver(), segmentProperties, partitions));
+ }
+ } else {
+ for (DataMap dataMap : dataMaps.get(segment)) {
+ pruneBlocklets.addAll(
+ dataMap.prune(filter.getExpression(), segmentProperties, partitions, table));
+ }
}
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
@@ -204,7 +179,7 @@ public final class TableDataMap extends OperationEventListener {
}
private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
- final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
+ final DataMapFilter filter, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
/*
@@ -295,14 +270,24 @@ public final class TableDataMap extends OperationEventListener {
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
- for (int i = segmentDataMapGroup.getFromIndex();
- i <= segmentDataMapGroup.getToIndex(); i++) {
- List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(filterExp,
- segmentProperties,
- partitions);
- pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
- .getExtendedBlocklets(dmPruneBlocklets, segment),
- segment));
+ if (filter.isResolvedOnSegment(segmentProperties)) {
+ for (int i = segmentDataMapGroup.getFromIndex();
+ i <= segmentDataMapGroup.getToIndex(); i++) {
+ List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
+ filter.getResolver(), segmentProperties, partitions);
+ pruneBlocklets.addAll(addSegmentId(
+ blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
+ segment));
+ }
+ } else {
+ for (int i = segmentDataMapGroup.getFromIndex();
+ i <= segmentDataMapGroup.getToIndex(); i++) {
+ List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
+ filter.getExpression(), segmentProperties, partitions, table);
+ pruneBlocklets.addAll(addSegmentId(
+ blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
+ segment));
+ }
}
synchronized (prunedBlockletMap) {
List<ExtendedBlocklet> pruneBlockletsExisting =
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index 4643b47..bb2662b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
@@ -50,7 +51,7 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
@Override
public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
throws IOException {
- return dataMap.prune(segments, expression, partitionsToPrune);
+ return dataMap.prune(segments, new DataMapFilter(expression), partitionsToPrune);
}
public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 3623147..54ea772 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -121,6 +121,11 @@ public class CarbonTable implements Serializable {
private List<CarbonMeasure> allMeasures;
/**
+ * list of column drift
+ */
+ private List<CarbonDimension> columnDrift;
+
+ /**
* table bucket map.
*/
private Map<String, BucketingInfo> tableBucketMap;
@@ -189,6 +194,7 @@ public class CarbonTable implements Serializable {
this.tablePartitionMap = new HashMap<>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
+ this.columnDrift = new ArrayList<CarbonDimension>();
}
/**
@@ -898,6 +904,12 @@ public class CarbonTable implements Serializable {
for (CarbonDimension dimension : allDimensions) {
if (!dimension.isInvisible()) {
visibleDimensions.add(dimension);
+ Map<String, String> columnProperties = dimension.getColumnProperties();
+ if (columnProperties != null) {
+ if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
+ columnDrift.add(dimension);
+ }
+ }
}
}
tableDimensionsMap.put(tableName, visibleDimensions);
@@ -912,6 +924,14 @@ public class CarbonTable implements Serializable {
return allMeasures;
}
+ public List<CarbonDimension> getColumnDrift() {
+ return columnDrift;
+ }
+
+ public boolean hasColumnDrift() {
+ return tableInfo.hasColumnDrift();
+ }
+
/**
* This method will all the visible allMeasures
*
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index daba29b..ec9d311 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -91,6 +91,8 @@ public class TableInfo implements Serializable, Writable {
*/
private boolean isTransactionalTable = true;
+ private boolean hasColumnDrift = false;
+
// this identifier is a lazy field which will be created when it is used first time
private AbsoluteTableIdentifier identifier;
@@ -122,6 +124,7 @@ public class TableInfo implements Serializable, Writable {
this.factTable = factTable;
updateParentRelationIdentifier();
updateIsSchemaModified();
+ updateHasColumnDrift();
}
private void updateIsSchemaModified() {
@@ -276,6 +279,7 @@ public class TableInfo implements Serializable, Writable {
out.writeLong(lastUpdatedTime);
out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
out.writeBoolean(isTransactionalTable);
+ out.writeBoolean(hasColumnDrift);
boolean isChildSchemaExists =
null != dataMapSchemaList && dataMapSchemaList.size() > 0;
out.writeBoolean(isChildSchemaExists);
@@ -305,6 +309,7 @@ public class TableInfo implements Serializable, Writable {
this.lastUpdatedTime = in.readLong();
this.tablePath = in.readUTF();
this.isTransactionalTable = in.readBoolean();
+ this.hasColumnDrift = in.readBoolean();
boolean isChildSchemaExists = in.readBoolean();
this.dataMapSchemaList = new ArrayList<>();
if (isChildSchemaExists) {
@@ -371,4 +376,22 @@ public class TableInfo implements Serializable, Writable {
return isSchemaModified;
}
+ private void updateHasColumnDrift() {
+ this.hasColumnDrift = false;
+ for (ColumnSchema columnSchema : factTable.getListOfColumns()) {
+ if (columnSchema.isDimensionColumn() && !columnSchema.isInvisible()) {
+ Map<String, String> columnProperties = columnSchema.getColumnProperties();
+ if (columnProperties != null) {
+ if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
+ this.hasColumnDrift = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasColumnDrift() {
+ return hasColumnDrift;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index b15bdb5..f06f5c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -139,20 +138,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
- // calculating the total number of aggregated columns
- int measureCount = queryModel.getProjectionMeasures().size();
-
- int currentIndex = 0;
- DataType[] dataTypes = new DataType[measureCount];
-
- for (ProjectionMeasure carbonMeasure : queryModel.getProjectionMeasures()) {
- // adding the data type and aggregation type of all the measure this
- // can be used
- // to select the aggregator
- dataTypes[currentIndex] = carbonMeasure.getMeasure().getDataType();
- currentIndex++;
- }
- queryProperties.measureDataTypes = dataTypes;
+
// as aggregation will be executed in following order
// 1.aggregate dimension expression
// 2. expression
@@ -461,14 +447,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
throws QueryExecutionException {
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
- List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
-
+ // set actual query dimensions and measures. It may differ in case of restructure scenarios
+ RestructureUtil.actualProjectionOfSegment(blockExecutionInfo, queryModel, segmentProperties);
// below is to get only those dimension in query which is present in the
// table block
List<ProjectionDimension> projectDimensions = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
- queryModel.getProjectionDimensions(), tableBlockDimensions,
- segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
+ blockExecutionInfo.getActualQueryDimensions(), segmentProperties.getDimensions(),
+ segmentProperties.getComplexDimensions(),
+ blockExecutionInfo.getActualQueryMeasures().length,
queryModel.getTable().getTableInfo().isTransactionalTable());
boolean isStandardTable = CarbonUtil.isStandardCarbonTable(queryModel.getTable());
String blockId = CarbonUtil
@@ -486,10 +473,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfo.setProjectionDimensions(projectDimensions
.toArray(new ProjectionDimension[projectDimensions.size()]));
// get measures present in the current block
- List<ProjectionMeasure> currentBlockQueryMeasures =
- getCurrentBlockQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
+ List<ProjectionMeasure> projectionMeasures = RestructureUtil
+ .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
+ blockExecutionInfo.getActualQueryMeasures(), segmentProperties.getMeasures(),
+ queryModel.getTable().getTableInfo().isTransactionalTable());
blockExecutionInfo.setProjectionMeasures(
- currentBlockQueryMeasures.toArray(new ProjectionMeasure[currentBlockQueryMeasures.size()]));
+ projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
blockExecutionInfo.setDataBlock(blockIndex);
// setting whether raw record query or not
blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery());
@@ -581,7 +570,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// list of measures to be projected
List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
int[] measureChunkIndexes = QueryUtil.getMeasureChunkIndexes(
- currentBlockQueryMeasures, expressionMeasures,
+ projectionMeasures, expressionMeasures,
segmentProperties.getMeasuresOrdinalToChunkMapping(), filterMeasures,
allProjectionListMeasureIndexes);
reusableBufferSize = Math.max(segmentProperties.getMeasuresOrdinalToChunkMapping().size(),
@@ -637,11 +626,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfo.setComplexColumnParentBlockIndexes(
getComplexDimensionParentBlockIndexes(projectDimensions));
blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
- // set actual query dimensions and measures. It may differ in case of restructure scenarios
- blockExecutionInfo.setActualQueryDimensions(queryModel.getProjectionDimensions()
- .toArray(new ProjectionDimension[queryModel.getProjectionDimensions().size()]));
- blockExecutionInfo.setActualQueryMeasures(queryModel.getProjectionMeasures()
- .toArray(new ProjectionMeasure[queryModel.getProjectionMeasures().size()]));
DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId());
return blockExecutionInfo;
@@ -691,28 +675,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
return 0;
}
- /**
- * Below method will be used to get the measures present in the current block
- *
- * @param executionInfo
- * @param queryModel query model
- * @param tableBlock table block
- * @return
- */
- private List<ProjectionMeasure> getCurrentBlockQueryMeasures(BlockExecutionInfo executionInfo,
- QueryModel queryModel, AbstractIndex tableBlock) throws QueryExecutionException {
- // getting the measure info which will be used while filling up measure data
- List<ProjectionMeasure> updatedQueryMeasures = RestructureUtil
- .createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
- queryModel.getProjectionMeasures(),
- tableBlock.getSegmentProperties().getMeasures(),
- queryModel.getTable().getTableInfo().isTransactionalTable());
- // setting the measure aggregator for all aggregation function selected
- // in query
- executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);
- return updatedQueryMeasures;
- }
-
private int[] getComplexDimensionParentBlockIndexes(List<ProjectionDimension> queryDimensions) {
List<Integer> parentBlockIndexList = new ArrayList<Integer>();
for (ProjectionDimension queryDimension : queryDimensions) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
index 4b59aa7..22939e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -40,10 +39,6 @@ public class QueryExecutorProperties {
public Map<String, Dictionary> columnToDictionaryMapping;
/**
- * Measure datatypes
- */
- public DataType[] measureDataTypes;
- /**
* all the complex dimension which is on filter
*/
public Set<CarbonDimension> complexFilterDimension;
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index e823eb2..11b7372 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.scan.executor.infos.DimensionInfo;
import org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -63,16 +65,16 @@ public class RestructureUtil {
* @return list of query dimension which is present in the table block
*/
public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
- BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
+ BlockExecutionInfo blockExecutionInfo, ProjectionDimension[] queryDimensions,
List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
int measureCount, boolean isTransactionalTable) {
List<ProjectionDimension> presentDimension =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- boolean[] isDimensionExists = new boolean[queryDimensions.size()];
- Object[] defaultValues = new Object[queryDimensions.size()];
+ boolean[] isDimensionExists = new boolean[queryDimensions.length];
+ Object[] defaultValues = new Object[queryDimensions.length];
// create dimension information instance
DimensionInfo dimensionInfo = new DimensionInfo(isDimensionExists, defaultValues);
- dimensionInfo.dataType = new DataType[queryDimensions.size() + measureCount];
+ dimensionInfo.dataType = new DataType[queryDimensions.length + measureCount];
int newDictionaryColumnCount = 0;
int newNoDictionaryColumnCount = 0;
// selecting only those dimension which is present in the query
@@ -412,14 +414,15 @@ public class RestructureUtil {
* @return measures present in the block
*/
public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
- BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
+ BlockExecutionInfo blockExecutionInfo, ProjectionMeasure[] queryMeasures,
List<CarbonMeasure> currentBlockMeasures, boolean isTransactionalTable) {
MeasureInfo measureInfo = new MeasureInfo();
- List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
- int numberOfMeasureInQuery = queryMeasures.size();
+ List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.length);
+ int numberOfMeasureInQuery = queryMeasures.length;
List<Integer> measureOrdinalList = new ArrayList<>(numberOfMeasureInQuery);
Object[] defaultValues = new Object[numberOfMeasureInQuery];
boolean[] measureExistsInCurrentBlock = new boolean[numberOfMeasureInQuery];
+ DataType[] measureDataTypes = new DataType[numberOfMeasureInQuery];
int index = 0;
for (ProjectionMeasure queryMeasure : queryMeasures) {
// if query measure exists in current dimension measures
@@ -437,12 +440,14 @@ public class RestructureUtil {
presentMeasure.add(currentBlockMeasure);
measureOrdinalList.add(carbonMeasure.getOrdinal());
measureExistsInCurrentBlock[index] = true;
+ measureDataTypes[index] = carbonMeasure.getDataType();
break;
}
}
if (!measureExistsInCurrentBlock[index]) {
defaultValues[index] = getMeasureDefaultValue(queryMeasure.getMeasure().getColumnSchema(),
queryMeasure.getMeasure().getDefaultValue());
+ measureDataTypes[index] = queryMeasure.getMeasure().getDataType();
blockExecutionInfo.setRestructuredBlock(true);
}
index++;
@@ -452,7 +457,63 @@ public class RestructureUtil {
measureInfo.setDefaultValues(defaultValues);
measureInfo.setMeasureOrdinals(measureOrdinals);
measureInfo.setMeasureExists(measureExistsInCurrentBlock);
+ measureInfo.setMeasureDataTypes(measureDataTypes);
blockExecutionInfo.setMeasureInfo(measureInfo);
return presentMeasure;
}
+
+ /**
+ * set actual projection of blockExecutionInfo
+ */
+ public static void actualProjectionOfSegment(BlockExecutionInfo blockExecutionInfo,
+ QueryModel queryModel, SegmentProperties segmentProperties) {
+ List<ProjectionDimension> projectionDimensions = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> projectionMeasures = queryModel.getProjectionMeasures();
+ if (queryModel.getTable().hasColumnDrift()) {
+ List<CarbonMeasure> tableBlockMeasures = segmentProperties.getMeasures();
+ List<ProjectionMeasure> updatedProjectionMeasures =
+ new ArrayList<>(projectionMeasures.size() + tableBlockMeasures.size());
+ updatedProjectionMeasures.addAll(projectionMeasures);
+ List<ProjectionDimension> updatedProjectionDimensions =
+ new ArrayList<>(projectionDimensions.size());
+ for (ProjectionDimension projectionDimension : projectionDimensions) {
+ CarbonMeasure carbonMeasure = null;
+ for (CarbonMeasure tableBlockMeasure : tableBlockMeasures) {
+ if (isColumnMatches(queryModel.getTable().isTransactionalTable(),
+ projectionDimension.getDimension(), tableBlockMeasure)) {
+ carbonMeasure = tableBlockMeasure;
+ break;
+ }
+ }
+ if (carbonMeasure != null) {
+ ProjectionMeasure projectionMeasure = new ProjectionMeasure(carbonMeasure);
+ projectionMeasure.setOrdinal(projectionDimension.getOrdinal());
+ updatedProjectionMeasures.add(projectionMeasure);
+ } else {
+ updatedProjectionDimensions.add(projectionDimension);
+ }
+ }
+ blockExecutionInfo.setActualQueryDimensions(updatedProjectionDimensions
+ .toArray(new ProjectionDimension[updatedProjectionDimensions.size()]));
+ blockExecutionInfo.setActualQueryMeasures(updatedProjectionMeasures
+ .toArray(new ProjectionMeasure[updatedProjectionMeasures.size()]));
+ } else {
+ blockExecutionInfo.setActualQueryDimensions(
+ projectionDimensions.toArray(new ProjectionDimension[projectionDimensions.size()]));
+ blockExecutionInfo.setActualQueryMeasures(
+ projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
+ }
+ }
+
+ public static boolean hasColumnDriftOnSegment(CarbonTable table,
+ SegmentProperties segmentProperties) {
+ for (CarbonDimension queryColumn : table.getColumnDrift()) {
+ for (CarbonMeasure tableColumn : segmentProperties.getMeasures()) {
+ if (isColumnMatches(table.isTransactionalTable(), queryColumn, tableColumn)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index 4f934ce..d736805 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -312,7 +312,7 @@ public class QueryModelBuilder {
queryModel.setReadPageByPage(readPageByPage);
queryModel.setProjection(projection);
- if (table.isTransactionalTable()) {
+ if (table.isTransactionalTable() && !table.hasColumnDrift()) {
// set the filter to the query model in order to filter blocklet before scan
boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
index 7332614..80ec647 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java
@@ -86,8 +86,8 @@ public class RestructureUtilTest {
ProjectionMeasure queryMeasure2 = new ProjectionMeasure(new CarbonMeasure(columnSchema4, 4));
List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2);
- List<ProjectionDimension> queryDimensions =
- Arrays.asList(queryDimension1, queryDimension2, queryDimension3);
+ ProjectionDimension[] queryDimensions =
+ new ProjectionDimension[] { queryDimension1, queryDimension2, queryDimension3 };
List<ProjectionDimension> result = null;
result = RestructureUtil
@@ -124,10 +124,11 @@ public class RestructureUtilTest {
ProjectionMeasure queryMeasure1 = new ProjectionMeasure(carbonMeasure1);
ProjectionMeasure queryMeasure2 = new ProjectionMeasure(carbonMeasure2);
ProjectionMeasure queryMeasure3 = new ProjectionMeasure(carbonMeasure3);
- List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3);
+ ProjectionMeasure[] queryMeasures =
+ new ProjectionMeasure[] { queryMeasure1, queryMeasure2, queryMeasure3 };
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
- RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures,
- currentBlockMeasures, true);
+ RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
+ queryMeasures, currentBlockMeasures, true);
MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
boolean[] measuresExist = { true, true, false };
assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index aba0ab7..90532fb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.DataMapUtil;
@@ -54,7 +55,6 @@ import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.stats.QueryStatistic;
@@ -468,15 +468,8 @@ m filterExpression
private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
Expression expression, List<Segment> segmentIds) throws IOException {
ExplainCollector.addPruningInfo(carbonTable.getTableName());
- FilterResolverIntf resolver = null;
- if (expression != null) {
- carbonTable.processFilterExpression(expression, null, null);
- resolver = CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier());
- ExplainCollector.setFilterStatement(expression.getStatement());
- } else {
- ExplainCollector.setFilterStatement("none");
- }
-
+ final DataMapFilter filter = new DataMapFilter(carbonTable, expression);
+ ExplainCollector.setFilterStatement(expression == null ? "none" : expression.getStatement());
boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -487,11 +480,7 @@ m filterExpression
List<ExtendedBlocklet> prunedBlocklets = null;
// This is to log the event, so user will know what is happening by seeing logs.
LOG.info("Started block pruning ...");
- if (carbonTable.isTransactionalTable()) {
- prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune);
- } else {
- prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune);
- }
+ prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune);
if (ExplainCollector.enabled()) {
ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
@@ -504,15 +493,15 @@ m filterExpression
DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
// Get the available CG datamaps and prune further.
- DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(resolver);
+ DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver());
if (cgDataMapExprWrapper != null) {
// Prune segments from already pruned blocklets
pruneSegments(segmentIds, prunedBlocklets);
List<ExtendedBlocklet> cgPrunedBlocklets;
// Again prune with CG datamap.
if (distributedCG && dataMapJob != null) {
- cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable,
- resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune);
+ cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, filter.getResolver(),
+ segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune);
} else {
cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
}
@@ -529,12 +518,12 @@ m filterExpression
}
// Now try to prune with FG DataMap.
if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) {
- DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(resolver);
+ DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver());
if (fgDataMapExprWrapper != null) {
// Prune segments from already pruned blocklets
pruneSegments(segmentIds, prunedBlocklets);
List<ExtendedBlocklet> fgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable,
- resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune);
+ filter.getResolver(), segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune);
// note that the 'fgPrunedBlocklets' has extra datamap related info compared with
// 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets,
diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv
new file mode 100644
index 0000000..1176363
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv
@@ -0,0 +1,13 @@
+smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField
+1,1,2,1.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+1,1,2,2.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,1,2,3.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,4.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2
+2,2,3,5.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3
+2,2,3,6.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3
+4,2,3,7.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$4,a$b$1
+4,2,3,8.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc4,abcd1,abcde1,a$b$c$4,a$b$1
+4,4,1,9.1,4.12,9.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2
+4,4,1,10.1,3.12,10.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2
+1,4,1,11.1,2.12,11.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$1,a$b$3
+1,4,1,12.1,1.12,12.123,2017-03-13 00:00:04,2017-03-14,abc1,abcd4,abcde4,a$b$c$1,a$b$3
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv
new file mode 100644
index 0000000..649bbdc
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv
@@ -0,0 +1,13 @@
+smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField
+1,1,1,13.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1
+1,1,1,14.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1
+1,1,2,15.1,4.12,9.123,2017-03-11 00:00:03,2017-03-11,abc2,abcd1,abcde1,a$b$c$2,a$b$2
+1,2,2,16.1,3.12,10.123,2017-03-11 00:00:03,2017-03-11,abc1,abcd1,abcde2,a$b$c$2,a$b$2
+1,2,2,17.1,2.12,11.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde2,a$b$c$1,a$b$1
+1,2,1,18.1,1.12,12.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,1,19.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,1,20.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1
+2,2,2,21.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,22.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2
+2,1,2,23.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2
+2,1,1,24.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
new file mode 100644
index 0000000..bf4bae6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.alterTable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ "yyyy-MM-dd")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ "yyyy-MM-dd HH:mm:ss")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+ dropTable()
+ prepareTable()
+ }
+
+ override def afterAll(): Unit = {
+ dropTable()
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+ CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+ }
+
+ private def prepareTable(): Unit = {
+ createTable(
+ "alter_sc_base",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+ )
+ createTable(
+ "alter_sc_base_complex",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField"),
+ true
+ )
+ createTable(
+ "alter_sc_validate",
+ Map("dictionary_include"->"charField"),
+ true
+ )
+ createTable(
+ "alter_sc_iud",
+ Map("dictionary_include"->"charField")
+ )
+ createTable(
+ "alter_sc_iud_complex",
+ Map("dictionary_include"->"charField"),
+ true
+ )
+ createTable(
+ "alter_sc_long_string",
+ Map("LONG_STRING_COLUMNS"->"stringField"),
+ true
+ )
+ createTable(
+ "alter_sc_insert",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+ )
+ loadData("alter_sc_insert")
+ createTable(
+ "alter_sc_insert_complex",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField"),
+ true
+ )
+ loadData("alter_sc_insert_complex")
+ createTable(
+ "alter_sc_range_column",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField", "range_column"->"smallIntField")
+ )
+ createTable(
+ "alter_sc_range_column_base",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+ )
+
+ Array("alter_sc_add_column", "alter_sc_add_column_base").foreach { tableName =>
+ sql(
+ s"""create table $tableName(
+ | smallIntField smallInt,
+ | intField int,
+ | bigIntField bigint,
+ | floatField float,
+ | doubleField double,
+ | timestampField timestamp,
+ | dateField date,
+ | stringField string
+ | )
+ | stored as carbondata
+ """.stripMargin)
+ }
+ // decimalField decimal(25, 4),
+
+ createTable(
+ "alter_sc_bloom",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+ )
+ createBloomDataMap("alter_sc_bloom", "alter_sc_bloom_dm1")
+ createTable(
+ "alter_sc_bloom_base",
+ Map("sort_scope"->"local_sort", "sort_columns"->"stringField")
+ )
+ createBloomDataMap("alter_sc_bloom_base", "alter_sc_bloom_base_dm1")
+ createTable(
+ "alter_sc_agg",
+ Map("sort_scope"->"local_sort", "sort_columns"->"intField")
+ )
+ createAggDataMap("alter_sc_agg", "alter_sc_agg_dm1")
+ createTable(
+ "alter_sc_agg_base",
+ Map("sort_scope"->"local_sort", "sort_columns"->"intField")
+ )
+ createAggDataMap("alter_sc_agg_base", "alter_sc_agg_base_dm1")
+ }
+
+ private def dropTable(): Unit = {
+ sql(s"drop table if exists alter_sc_base")
+ sql(s"drop table if exists alter_sc_base_complex")
+ sql(s"drop table if exists alter_sc_validate")
+ sql(s"drop table if exists alter_sc_iud")
+ sql(s"drop table if exists alter_sc_iud_complex")
+ sql(s"drop table if exists alter_sc_long_string")
+ sql(s"drop table if exists alter_sc_insert")
+ sql(s"drop table if exists alter_sc_insert_complex")
+ sql(s"drop table if exists alter_sc_range_column")
+ sql(s"drop table if exists alter_sc_range_column_base")
+ sql(s"drop table if exists alter_sc_add_column")
+ sql(s"drop table if exists alter_sc_add_column_base")
+ sql(s"drop table if exists alter_sc_bloom")
+ sql(s"drop table if exists alter_sc_bloom_base")
+ sql(s"drop table if exists alter_sc_agg")
+ sql(s"drop table if exists alter_sc_agg_base")
+ }
+
+ private def createTable(
+ tableName: String,
+ tblProperties: Map[String, String] = Map.empty,
+ withComplex: Boolean = false
+ ): Unit = {
+ val complexSql =
+ if (withComplex) {
+ ", arrayField array<string>, structField struct<col1:string, col2:string, col3:string>"
+ } else {
+ ""
+ }
+ val tblPropertiesSql =
+ if (tblProperties.isEmpty) {
+ ""
+ } else {
+ val propertiesString =
+ tblProperties
+ .map { entry =>
+ s"'${ entry._1 }'='${ entry._2 }'"
+ }
+ .mkString(",")
+ s"tblproperties($propertiesString)"
+ }
+
+ sql(
+ s"""create table $tableName(
+ | smallIntField smallInt,
+ | intField int,
+ | bigIntField bigint,
+ | floatField float,
+ | doubleField double,
+ | timestampField timestamp,
+ | dateField date,
+ | stringField string,
+ | varcharField varchar(10),
+ | charField char(10)
+ | $complexSql
+ | )
+ | stored as carbondata
+ | $tblPropertiesSql
+ """.stripMargin)
+ // decimalField decimal(25, 4),
+ }
+
+ private def createBloomDataMap(tableName: String, dataMapName: String): Unit = {
+ sql(
+ s"""
+ | CREATE DATAMAP $dataMapName ON TABLE $tableName
+ | USING 'bloomfilter'
+ | DMPROPERTIES(
+ | 'INDEX_COLUMNS'='smallIntField,floatField,timestampField,dateField,stringField',
+ | 'BLOOM_SIZE'='6400',
+ | 'BLOOM_FPP'='0.001',
+ | 'BLOOM_COMPRESS'='TRUE')
+ """.stripMargin)
+ }
+
+ private def createAggDataMap(tableName: String, dataMapName: String): Unit = {
+ sql(s"create datamap PreAggSum$dataMapName on table $tableName using 'preaggregate' as " +
+ s"select stringField,sum(intField) as sum from $tableName group by stringField")
+ sql(s"create datamap PreAggAvg$dataMapName on table $tableName using 'preaggregate' as " +
+ s"select stringField,avg(intField) as avg from $tableName group by stringField")
+ sql(s"create datamap PreAggCount$dataMapName on table $tableName using 'preaggregate' as " +
+ s"select stringField,count(intField) as count from $tableName group by stringField")
+ sql(s"create datamap PreAggMin$dataMapName on table $tableName using 'preaggregate' as " +
+ s"select stringField,min(intField) as min from $tableName group by stringField")
+ sql(s"create datamap PreAggMax$dataMapName on table $tableName using 'preaggregate' as " +
+ s"select stringField,max(intField) as max from $tableName group by stringField")
+ }
+
+ private def loadData(tableNames: String*): Unit = {
+ tableNames.foreach { tableName =>
+ sql(
+ s"""load data local inpath '$resourcesPath/sort_columns'
+ | into table $tableName
+ | options ('global_sort_partitions'='2', 'COMPLEX_DELIMITER_LEVEL_1'='$$', 'COMPLEX_DELIMITER_LEVEL_2'=':')
+ """.stripMargin)
+ }
+ }
+
+ private def insertData(insertTable: String, tableNames: String*): Unit = {
+ tableNames.foreach { tableName =>
+ sql(
+ s"""insert into table $tableName select * from $insertTable
+ """.stripMargin)
+ }
+ }
+
+ test("validate sort_scope and sort_columns") {
+ // invalid combination
+ var ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_SCOPE as local_sort when table has no SORT_COLUMNS"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_SCOPE as global_sort when table has no SORT_COLUMNS"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as local_sort"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' ')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as global_sort"))
+
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='stringField', 'sort_scope'='local_sort')")
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'=' ')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is LOCAL_SORT"))
+
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')")
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='')")
+ }
+ assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is GLOBAL_SORT"))
+
+ // wrong/duplicate sort_columns
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField')")
+ }
+ assert(ex.getMessage.contains("stringField1 does not exist in table"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField, stringField1')")
+ }
+ assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField1 or it contains illegal argumnet"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField , intField, stringField')")
+ }
+ assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField or it contains illegal argumnet"))
+
+ // not supported data type
+// ex = intercept[RuntimeException] {
+// sql("alter table alter_sc_validate set tblproperties('sort_columns'='decimalField')")
+// }
+// assert(ex.getMessage.contains("sort_columns is unsupported for DECIMAL data type column: decimalField"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='doubleField')")
+ }
+ assert(ex.getMessage.contains("sort_columns is unsupported for DOUBLE datatype column: doubleField"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='arrayField')")
+ }
+ assert(ex.getMessage.contains("sort_columns is unsupported for ARRAY datatype column: arrayField"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField')")
+ }
+ assert(ex.getMessage.contains("sort_columns is unsupported for STRUCT datatype column: structField"))
+
+ ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField.col1')")
+ }
+ assert(ex.getMessage.contains("sort_columns: structField.col1 does not exist in table"))
+ }
+
+ test("long string column") {
+ val ex = intercept[RuntimeException] {
+ sql("alter table alter_sc_long_string set tblproperties('sort_columns'='intField, stringField')")
+ }
+ assert(ex.getMessage.contains("sort_columns is unsupported for long string datatype column: stringField"))
+ }
+
+ test("describe formatted") {
+ // valid combination
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='')")
+ checkExistence(sql("describe formatted alter_sc_validate"), true, "NO_SORT")
+
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='bigIntField,stringField')")
+ checkExistence(sql("describe formatted alter_sc_validate"), true, "no_sort", "bigIntField, stringField".toLowerCase())
+
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='stringField,bigIntField')")
+ checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "stringField, bigIntField".toLowerCase())
+
+ // global dictionary or direct dictionary
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' charField , bigIntField , timestampField ')")
+ checkExistence(sql("describe formatted alter_sc_validate"), true, "global_sort", "charField, bigIntField, timestampField".toLowerCase())
+
+ // supported data type
+ sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField')")
+ checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField".toLowerCase())
+ }
+
+ test("IUD and Query") {
+ testIUDAndQuery("alter_sc_iud", "alter_sc_base", "alter_sc_insert")
+ }
+
+ test("IUD and Query with complex data type") {
+ testIUDAndQuery("alter_sc_iud_complex", "alter_sc_base_complex", "alter_sc_insert_complex")
+ }
+
+ private def testIUDAndQuery(tableName: String, baseTableName: String, insertTableName: String): Unit = {
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // alter table to local_sort with new SORT_COLUMNS
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='timestampField, intField, stringField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // alter table to revert SORT_COLUMNS
+ sql(s"alter table $tableName set tblproperties('sort_columns'='stringField, intField, timestampField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // alter table to change SORT_COLUMNS
+ sql(s"alter table $tableName set tblproperties('sort_columns'='smallIntField, stringField, intField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // alter table to change SORT_SCOPE and SORT_COLUMNS
+ sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // alter table to change SORT_SCOPE
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // query
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+
+ // set input segments
+ (0 to 5).foreach { segment =>
+ sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
+ sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ }
+ sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
+ sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
+
+ // delete
+ sql(s"delete from $tableName where smallIntField = 2")
+ sql(s"delete from $baseTableName where smallIntField = 2")
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ sql(s"delete from $tableName")
+ checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(0)))
+ sql(s"delete from $baseTableName")
+ checkAnswer(sql(s"select count(*) from $baseTableName"), Seq(Row(0)))
+
+ // insert & load data
+ sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='timestampField')")
+ insertData(insertTableName, tableName, baseTableName)
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ sql(s"alter table $tableName set tblproperties('sort_scope'='no_sort', 'sort_columns'='')")
+ insertData(insertTableName, tableName, baseTableName)
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')")
+ insertData(insertTableName, tableName, baseTableName)
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // update
+ sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show()
+ sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show()
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+
+ // query
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+
+ // set input segments
+ (6 to 11).foreach { segment =>
+ sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
+ sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ }
+ sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
+ sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
+
+ // compaction
+ sql(s"show segments for table $tableName").show(100, false)
+ sql(s"show segments for table $baseTableName").show(100, false)
+ sql(s"alter table $tableName compact 'minor'")
+ sql(s"alter table $baseTableName compact 'minor'")
+ sql(s"show segments for table $tableName").show(100, false)
+ sql(s"show segments for table $baseTableName").show(100, false)
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField"))
+ }
+
+ test("range column") {
+ val tableName = "alter_sc_range_column"
+ val baseTableName = "alter_sc_range_column_base"
+ loadData(tableName, baseTableName)
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')")
+ loadData(tableName, baseTableName)
+
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+ }
+
+ test("add/drop column for sort_columns") {
+ val tableName = "alter_sc_add_column"
+ val baseTableName = "alter_sc_add_column_base"
+ loadData(tableName, baseTableName)
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, stringField')")
+ loadData(tableName, baseTableName)
+ // add column
+ sql(s"alter table $tableName add columns( varcharField varchar(10), charField char(10))")
+ sql(s"alter table $baseTableName add columns( varcharField varchar(10), charField char(10))")
+ loadData(tableName, baseTableName)
+
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+
+ // add new column to sort_columns
+ sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+
+ // drop column of old sort_columns
+ sql(s"alter table $tableName drop columns(stringField)")
+ sql(s"alter table $baseTableName drop columns(stringField)")
+ loadData(tableName, baseTableName)
+ checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+ checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField"))
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField"))
+ }
+
+ test("bloom filter") {
+ val tableName = "alter_sc_bloom"
+ val dataMapName = "alter_sc_bloom_dm1"
+ val baseTableName = "alter_sc_bloom_base"
+ loadData(tableName, baseTableName)
+ checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "bloomfilter", dataMapName)
+ checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName)
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField"))
+
+ sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')")
+ loadData(tableName, baseTableName)
+ checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName)
+ checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField"))
+ }
+
+ test("pre-aggregate") {
+ val tableName = "alter_sc_agg"
+ val dataMapName = "alter_sc_agg_dm1"
+ val baseTableName = "alter_sc_agg_base"
+ loadData(tableName, baseTableName)
+ sql(s"SHOW DATAMAP ON TABLE $tableName").show(100, false)
+ checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "preaggregate", dataMapName)
+ checkExistence(sql(s"EXPLAIN select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
+ checkAnswer(sql(s"select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,sum(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField"))
+
+ sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')")
+ loadData(tableName, baseTableName)
+ sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField").show(100, false)
+ checkExistence(sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
+ checkAnswer(sql(s"select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,max(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField"))
+ }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6cee8dc..d0ed815 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -232,7 +232,11 @@ class CarbonScanRDD[T: ClassTag](
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statistic = new QueryStatistic()
- val carbonDistribution = if (directFill) {
+ // When the table has column drift, it means different blocks maybe have different schemas.
+ // the query doesn't support to scan the blocks with different schemas in a task.
+ // So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and
+ // CARBON_TASK_DISTRIBUTION_CUSTOM can't work.
+ val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) {
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
} else {
CarbonProperties.getInstance().getProperty(
@@ -260,7 +264,7 @@ class CarbonScanRDD[T: ClassTag](
CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
"false").toBoolean ||
carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
- if (useCustomDistribution) {
+ if (useCustomDistribution && !tableInfo.hasColumnDrift) {
// create a list of block based on split
val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
@@ -297,7 +301,7 @@ class CarbonScanRDD[T: ClassTag](
val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
result.add(partition)
}
- } else if (carbonDistribution.equalsIgnoreCase(
+ } else if (!tableInfo.hasColumnDrift && carbonDistribution.equalsIgnoreCase(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {
// sort blocks in reverse order of length
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d90c6b2..da42363 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -17,7 +17,6 @@
package org.apache.carbondata.spark.util
-
import java.io.File
import java.math.BigDecimal
import java.text.SimpleDateFormat
@@ -47,16 +46,16 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalTaskInfo}
import org.apache.carbondata.core.util.comparator.Comparator
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-
object CommonUtil {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -794,6 +793,7 @@ object CommonUtil {
}
storeLocation
}
+
/**
* This method will validate the cache level
*
@@ -909,6 +909,80 @@ object CommonUtil {
}
}
+ def isDataTypeSupportedForSortColumn(columnDataType: String): Boolean = {
+ val dataTypes = Array("array", "struct", "map", "double", "float", "decimal", "binary")
+ dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+ }
+
+ def validateSortScope(newProperties: Map[String, String]): Unit = {
+ val sortScopeOption = newProperties.get(CarbonCommonConstants.SORT_SCOPE)
+ if (sortScopeOption.isDefined) {
+ if (!CarbonUtil.isValidSortOption(sortScopeOption.get)) {
+ throw new MalformedCarbonCommandException(
+ s"Invalid SORT_SCOPE ${ sortScopeOption.get }, " +
+ s"valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT'")
+ }
+ }
+ }
+
+ def validateSortColumns(
+ sortKey: Array[String],
+ fields: Seq[(String, String)],
+ varcharCols: Seq[String]
+ ): Unit = {
+ if (sortKey.diff(sortKey.distinct).length > 0 ||
+ (sortKey.length > 1 && sortKey.contains(""))) {
+ throw new MalformedCarbonCommandException(
+ "SORT_COLUMNS Either having duplicate columns : " +
+ sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
+ }
+
+ sortKey.foreach { column =>
+ if (!fields.exists(x => x._1.equalsIgnoreCase(column))) {
+ val errorMsg = "sort_columns: " + column +
+ " does not exist in table. Please check the create table statement."
+ throw new MalformedCarbonCommandException(errorMsg)
+ } else {
+ val dataType = fields.find(x =>
+ x._1.equalsIgnoreCase(column)).get._2
+ if (isDataTypeSupportedForSortColumn(dataType)) {
+ val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
+ throw new MalformedCarbonCommandException(errorMsg)
+ }
+ if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
+ throw new MalformedCarbonCommandException(
+ s"sort_columns is unsupported for long string datatype column: $column")
+ }
+ }
+ }
+ }
+
+ def validateSortColumns(carbonTable: CarbonTable, newProperties: Map[String, String]): Unit = {
+ val fields = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
+ val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
+ var sortKeyOption = newProperties.get(CarbonCommonConstants.SORT_COLUMNS)
+ val varcharColsString = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+ val varcharCols: Seq[String] = if (varcharColsString == null) {
+ Seq.empty[String]
+ } else {
+ varcharColsString.split(",").map(_.trim)
+ }
+
+ if (!sortKeyOption.isDefined) {
+ // default no columns are selected for sorting in no_sort scope
+ sortKeyOption = Some("")
+ }
+ val sortKeyString = CarbonUtil.unquoteChar(sortKeyOption.get).trim
+ if (!sortKeyString.isEmpty) {
+ val sortKey = sortKeyString.split(',').map(_.trim)
+ validateSortColumns(
+ sortKey,
+ fields.map { field => (field.getColName, field.getDataType.getName) },
+ varcharCols
+ )
+ }
+ }
+
def bytesToDisplaySize(size: Long): String = bytesToDisplaySize(BigDecimal.valueOf(size))
// This method converts the bytes count to display size upto 2 decimal places
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3e80ea6..d978128 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -760,32 +760,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
var sortKeyDimsTmp: Seq[String] = Seq[String]()
if (!sortKeyString.isEmpty) {
val sortKey = sortKeyString.split(',').map(_.trim)
- if (sortKey.diff(sortKey.distinct).length > 0 ||
- (sortKey.length > 1 && sortKey.contains(""))) {
- throw new MalformedCarbonCommandException(
- "SORT_COLUMNS Either having duplicate columns : " +
- sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
- }
-
- sortKey.foreach { column =>
- if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
- val errorMsg = "sort_columns: " + column +
- " does not exist in table. Please check the create table statement."
- throw new MalformedCarbonCommandException(errorMsg)
- } else {
- val dataType = fields.find(x =>
- x.column.equalsIgnoreCase(column)).get.dataType.get
- if (isDataTypeSupportedForSortColumn(dataType)) {
- val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
- throw new MalformedCarbonCommandException(errorMsg)
- }
- if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
- throw new MalformedCarbonCommandException(
- s"sort_columns is unsupported for long string datatype column: $column")
- }
- }
- }
-
+ CommonUtil.validateSortColumns(
+ sortKey,
+ fields.map { field => (field.column, field.dataType.get) },
+ varcharCols
+ )
sortKey.foreach { dimension =>
if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase)) {
fields.foreach { field =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 1dc562dc..99bc863 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
+import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -32,13 +33,14 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -101,6 +103,76 @@ object AlterTableUtil {
}
/**
+ * update schema when SORT_COLUMNS are be changed
+ */
+ private def updateSchemaForSortColumns(
+ thriftTable: TableInfo,
+ lowerCasePropertiesMap: mutable.Map[String, String],
+ schemaConverter: SchemaConverter
+ ) = {
+ val sortColumnsOption = lowerCasePropertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
+ if (sortColumnsOption.isDefined) {
+ val sortColumnsString = CarbonUtil.unquoteChar(sortColumnsOption.get).trim
+ val columns = thriftTable.getFact_table.getTable_columns
+ // remove old sort_columns property from ColumnSchema
+ val columnSeq =
+ columns
+ .asScala
+ .map { column =>
+ val columnProperties = column.getColumnProperties
+ if (columnProperties != null) {
+ columnProperties.remove(CarbonCommonConstants.SORT_COLUMNS)
+ }
+ column
+ }
+ .zipWithIndex
+ if (!sortColumnsString.isEmpty) {
+ val newSortColumns = sortColumnsString.split(',').map(_.trim)
+ // map sort_columns index in column list
+ val sortColumnsIndexMap = newSortColumns
+ .zipWithIndex
+ .map { entry =>
+ val column = columnSeq.find(_._1.getColumn_name.equalsIgnoreCase(entry._1)).get
+ var columnProperties = column._1.getColumnProperties
+ if (columnProperties == null) {
+ columnProperties = new util.HashMap[String, String]()
+ column._1.setColumnProperties(columnProperties)
+ }
+ // change sort_columns to dimension
+ if (!column._1.isDimension) {
+ column._1.setDimension(true)
+ columnProperties.put(CarbonCommonConstants.COLUMN_DRIFT, "true")
+ }
+ // add sort_columns property
+ columnProperties.put(CarbonCommonConstants.SORT_COLUMNS, "true")
+ (column._2, entry._2)
+ }
+ .toMap
+ var index = newSortColumns.length
+ // re-order all columns, move sort_columns to the head of column list
+ val newColumns = columnSeq
+ .map { entry =>
+ val sortColumnIndexOption = sortColumnsIndexMap.get(entry._2)
+ val newIndex = if (sortColumnIndexOption.isDefined) {
+ sortColumnIndexOption.get
+ } else {
+ val tempIndex = index
+ index += 1
+ tempIndex
+ }
+ (newIndex, entry._1)
+ }
+ .sortWith(_._1 < _._1)
+ .map(_._2)
+ .asJava
+ // use new columns
+ columns.clear()
+ columns.addAll(newColumns)
+ }
+ }
+ }
+
+ /**
* @param carbonTable
* @param schemaEvolutionEntry
* @param thriftTable
@@ -361,9 +433,10 @@ object AlterTableUtil {
// validate the range column properties
validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap)
- // validate the Sort Scope
- validateSortScopeProperty(carbonTable, lowerCasePropertiesMap)
-
+ // validate the Sort Scope and Sort Columns
+ validateSortScopeAndSortColumnsProperties(carbonTable, lowerCasePropertiesMap)
+ // if SORT_COLUMN is changed, it will move them to the head of column list
+ updateSchemaForSortColumns(thriftTable, lowerCasePropertiesMap, schemaConverter)
// below map will be used for cache invalidation. As tblProperties map is getting modified
// in the next few steps the original map need to be retained for any decision making
val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
@@ -394,9 +467,13 @@ object AlterTableUtil {
if (propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) {
tblPropertiesMap
.put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
- } else if (propKey.equalsIgnoreCase("sort_scope")) {
+ } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_SCOPE)) {
tblPropertiesMap
.put(propKey.toLowerCase, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS)) {
+ val errorMessage = "Error: Invalid option(s): " + propKey +
+ ", please set SORT_COLUMNS as empty instead of unset"
+ throw new MalformedCarbonCommandException(errorMessage)
} else {
tblPropertiesMap.remove(propKey.toLowerCase)
}
@@ -440,7 +517,8 @@ object AlterTableUtil {
"LOCAL_DICTIONARY_EXCLUDE",
"LOAD_MIN_SIZE_INMB",
"RANGE_COLUMN",
- "SORT_SCOPE")
+ "SORT_SCOPE",
+ "SORT_COLUMNS")
supportedOptions.contains(propKey.toUpperCase)
}
@@ -542,18 +620,34 @@ object AlterTableUtil {
}
}
- def validateSortScopeProperty(carbonTable: CarbonTable,
+ def validateSortScopeAndSortColumnsProperties(carbonTable: CarbonTable,
propertiesMap: mutable.Map[String, String]): Unit = {
- propertiesMap.foreach { property =>
- if (property._1.equalsIgnoreCase("SORT_SCOPE")) {
- if (!CarbonUtil.isValidSortOption(property._2)) {
- throw new MalformedCarbonCommandException(
- s"Invalid SORT_SCOPE ${ property._2 }, valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', " +
- s"'LOCAL_SORT' and 'GLOBAL_SORT'")
- } else if (!property._2.equalsIgnoreCase("NO_SORT") &&
- (carbonTable.getNumberOfSortColumns == 0)) {
+ CommonUtil.validateSortScope(propertiesMap)
+ CommonUtil.validateSortColumns(carbonTable, propertiesMap)
+ // match SORT_SCOPE and SORT_COLUMNS
+ val newSortScope = propertiesMap.get(CarbonCommonConstants.SORT_SCOPE)
+ val newSortColumns = propertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
+ if (newSortScope.isDefined) {
+ // 1. check SORT_COLUMNS when SORT_SCOPE is not changed to NO_SORT
+ if (!SortScope.NO_SORT.name().equalsIgnoreCase(newSortScope.get)) {
+ if (newSortColumns.isDefined) {
+ if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) {
+ throw new InvalidConfigurationException(
+ s"Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as ${newSortScope.get} ")
+ }
+ } else {
+ if (carbonTable.getNumberOfSortColumns == 0) {
+ throw new InvalidConfigurationException(
+ s"Cannot set SORT_SCOPE as ${newSortScope.get} when table has no SORT_COLUMNS")
+ }
+ }
+ }
+ } else if (newSortColumns.isDefined) {
+ // 2. check SORT_SCOPE when SORT_COLUMNS is changed to empty
+ if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) {
+ if (!SortScope.NO_SORT.equals(carbonTable.getSortScope)) {
throw new InvalidConfigurationException(
- s"Cannot set SORT_SCOPE as ${ property._2 } when table has no SORT_COLUMNS")
+ s"Cannot set SORT_COLUMNS as empty when SORT_SCOPE is ${carbonTable.getSortScope} ")
}
}
}