You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/06/06 02:34:52 UTC
[carbondata] branch master updated: [CARBONDATA-3350] Enhance
custom compaction to resort old single segment by new sort_columns
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 6fa7fb4 [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns
6fa7fb4 is described below
commit 6fa7fb4f94ca3082113d0b47b109bdd16cf046a3
Author: QiangCai <qi...@qq.com>
AuthorDate: Wed May 15 16:46:20 2019 +0800
[CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns
This closes #3202
---
.../blockletindex/BlockletDataMapFactory.java | 2 +-
.../TableStatusReadCommittedScope.java | 2 +-
.../spark/rdd/CarbonTableCompactor.scala | 21 +++-
.../processing/merger/CarbonCompactionUtil.java | 132 +++++++++++++++++----
4 files changed, 128 insertions(+), 29 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 446507f..cab1b8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -167,7 +167,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
return dataMaps;
}
- private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+ public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
throws IOException {
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 5622efe..e4fd6f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -55,7 +55,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
}
public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
- LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
+ LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) {
this.identifier = identifier;
this.configuration = configuration;
this.loadMetadataDetails = loadMetadataDetails;
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index afe2927..4c7dd95 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,13 +29,15 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
import org.apache.spark.util.MergeIndexUtil
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
/**
@@ -50,6 +52,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+ private def needSortSingleSegment(
+ loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
+ // support to resort old segment with old sort_columns
+ if (CompactionType.CUSTOM == compactionModel.compactionType &&
+ loadsToMerge.size() == 1 &&
+ SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
+ !CarbonCompactionUtil.isSortedByCurrentSortColumns(
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ loadsToMerge.get(0),
+ FileFactory.getConfiguration)
+ } else {
+ false
+ }
+ }
+
override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
@@ -58,7 +75,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
var loadsToMerge = identifySegmentsToBeMerged()
- while (loadsToMerge.size() > 1 ||
+ while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
(CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index c3017a7..8cf477e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -24,11 +24,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -49,13 +52,16 @@ import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpress
import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
@@ -612,6 +618,37 @@ public class CarbonCompactionUtil {
return taskIdSet.size();
}
+ private static boolean compareSortColumns(CarbonTable table, List<ColumnSchema> fileColumns) {
+ // When sort_columns is modified, it will be consider as no_sort also.
+ List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
+ for (ColumnSchema column : fileColumns) {
+ if (column.isDimensionColumn() && column.isSortColumn()) {
+ sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
+ }
+ }
+ if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
+ return false;
+ }
+ List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
+ for (CarbonDimension dimension : table.getDimensions()) {
+ if (dimension.isSortColumn()) {
+ sortColumnsOfTable.add(dimension);
+ }
+ }
+ int sortColumnNums = sortColumnsOfTable.size();
+ if (sortColumnsOfSegment.size() < sortColumnNums) {
+ return false;
+ }
+ // compare sort_columns
+ for (int i = 0; i < sortColumnNums; i++) {
+ if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
+ sortColumnsOfSegment.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Returns if the DataFileFooter containing carbondata file contains
* sorted data or not.
@@ -622,37 +659,82 @@ public class CarbonCompactionUtil {
*/
public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) {
if (footer.isSorted()) {
- // When sort_columns is modified, it will be consider as no_sort also.
- List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
- for (ColumnSchema column : footer.getColumnInTable()) {
- if (column.isDimensionColumn() && column.isSortColumn()) {
- sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
+ return compareSortColumns(table, footer.getColumnInTable());
+ } else {
+ return false;
+ }
+ }
+
+ public static boolean isSortedByCurrentSortColumns(
+ CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) {
+ List<String> sortColumnList = table.getSortColumns();
+ if (sortColumnList.isEmpty()) {
+ return false;
+ }
+ // table sort_columns
+ String sortColumns = Strings.mkString(
+ sortColumnList.toArray(new String[sortColumnList.size()]), ",");
+ String segmentPath =
+ CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName());
+ // segment sort_columns
+ String segmentSortColumns = getSortColumnsOfSegment(segmentPath);
+ if (segmentSortColumns == null) {
+ return false;
+ } else {
+ return segmentSortColumns.equalsIgnoreCase(sortColumns);
+ }
+ }
+
+ private static String mkSortColumnsString(
+ List<org.apache.carbondata.format.ColumnSchema> columnList) {
+ StringBuilder builder = new StringBuilder();
+ for (org.apache.carbondata.format.ColumnSchema column : columnList) {
+ if (column.isDimension()) {
+ Map<String, String> properties = column.getColumnProperties();
+ if (properties != null) {
+ if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+ builder.append(column.column_name).append(",");
+ }
}
}
- if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
- return false;
- }
- List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
- for (CarbonDimension dimension : table.getDimensions()) {
- if (dimension.isSortColumn()) {
- sortColumnsOfTable.add(dimension);
+ }
+ if (builder.length() > 1) {
+ return builder.substring(0, builder.length() - 1);
+ } else {
+ return null;
+ }
+ }
+
+ public static String getSortColumnsOfSegment(String segmentFolder) {
+ CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(
+ segmentFolder, FileFactory.getConfiguration());
+ Set<Boolean> isSortSet = new HashSet<>();
+ Set<String> sortColumnsSet = new HashSet<>();
+ if (files != null) {
+ for (CarbonFile file : files) {
+ IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader(
+ file.getCanonicalPath(), FileFactory.getConfiguration());
+ if (indexHeader != null) {
+ if (indexHeader.isSetIs_sort()) {
+ isSortSet.add(indexHeader.is_sort);
+ if (indexHeader.is_sort) {
+ sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
+ }
+ } else {
+ // if is_sort is not set, it will be old store and consider as local_sort by default.
+ sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
+ }
}
- }
- int sortColumnNums = sortColumnsOfTable.size();
- if (sortColumnsOfSegment.size() < sortColumnNums) {
- return false;
- }
- // compare sort_columns
- for (int i = 0; i < sortColumnNums; i++) {
- if (!RestructureUtil
- .isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
- sortColumnsOfSegment.get(i))) {
- return false;
+ if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) {
+ break;
}
}
- return true;
+ }
+ // for all index files, sort_columns should be same
+ if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) {
+ return sortColumnsSet.iterator().next();
} else {
- return false;
+ return null;
}
}