You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/06/06 02:34:52 UTC

[carbondata] branch master updated: [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fa7fb4  [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns
6fa7fb4 is described below

commit 6fa7fb4f94ca3082113d0b47b109bdd16cf046a3
Author: QiangCai <qi...@qq.com>
AuthorDate: Wed May 15 16:46:20 2019 +0800

    [CARBONDATA-3350] Enhance custom compaction to resort old single segment by new sort_columns
    
    This closes #3202
---
 .../blockletindex/BlockletDataMapFactory.java      |   2 +-
 .../TableStatusReadCommittedScope.java             |   2 +-
 .../spark/rdd/CarbonTableCompactor.scala           |  21 +++-
 .../processing/merger/CarbonCompactionUtil.java    | 132 +++++++++++++++++----
 4 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 446507f..cab1b8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -167,7 +167,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return dataMaps;
   }
 
-  private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+  public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
       throws IOException {
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 5622efe..e4fd6f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -55,7 +55,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
   }
 
   public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
-      LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
+      LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) {
     this.identifier = identifier;
     this.configuration = configuration;
     this.loadMetadataDetails = loadMetadataDetails;
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index afe2927..4c7dd95 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,13 +29,15 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
 import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
 
 /**
@@ -50,6 +52,21 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     operationContext: OperationContext)
   extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
 
+  private def needSortSingleSegment(
+      loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
+    // support to resort old segment with old sort_columns
+    if (CompactionType.CUSTOM == compactionModel.compactionType &&
+        loadsToMerge.size() == 1 &&
+        SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
+      !CarbonCompactionUtil.isSortedByCurrentSortColumns(
+        carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+        loadsToMerge.get(0),
+        FileFactory.getConfiguration)
+    } else {
+      false
+    }
+  }
+
   override def executeCompaction(): Unit = {
     val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
       carbonLoadModel.getLoadMetadataDetails
@@ -58,7 +75,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     var loadsToMerge = identifySegmentsToBeMerged()
 
-    while (loadsToMerge.size() > 1 ||
+    while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
            (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
             loadsToMerge.size() > 0)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index c3017a7..8cf477e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -24,11 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -49,13 +52,16 @@ import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpress
 import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 /**
@@ -612,6 +618,37 @@ public class CarbonCompactionUtil {
     return taskIdSet.size();
   }
 
+  private static boolean compareSortColumns(CarbonTable table, List<ColumnSchema> fileColumns) {
+    // When sort_columns is modified, it will be consider as no_sort also.
+    List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
+    for (ColumnSchema column : fileColumns) {
+      if (column.isDimensionColumn() && column.isSortColumn()) {
+        sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
+      }
+    }
+    if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
+      return false;
+    }
+    List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
+    for (CarbonDimension dimension : table.getDimensions()) {
+      if (dimension.isSortColumn()) {
+        sortColumnsOfTable.add(dimension);
+      }
+    }
+    int sortColumnNums = sortColumnsOfTable.size();
+    if (sortColumnsOfSegment.size() < sortColumnNums) {
+      return false;
+    }
+    // compare sort_columns
+    for (int i = 0; i < sortColumnNums; i++) {
+      if (!RestructureUtil.isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
+          sortColumnsOfSegment.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Returns if the DataFileFooter containing carbondata file contains
    * sorted data or not.
@@ -622,37 +659,82 @@ public class CarbonCompactionUtil {
    */
   public static boolean isSortedByCurrentSortColumns(CarbonTable table, DataFileFooter footer) {
     if (footer.isSorted()) {
-      // When sort_columns is modified, it will be consider as no_sort also.
-      List<CarbonDimension> sortColumnsOfSegment = new ArrayList<>();
-      for (ColumnSchema column : footer.getColumnInTable()) {
-        if (column.isDimensionColumn() && column.isSortColumn()) {
-          sortColumnsOfSegment.add(new CarbonDimension(column, -1, -1, -1));
+      return compareSortColumns(table, footer.getColumnInTable());
+    } else {
+      return false;
+    }
+  }
+
+  public static boolean isSortedByCurrentSortColumns(
+      CarbonTable table, LoadMetadataDetails load, Configuration hadoopConf) {
+    List<String> sortColumnList = table.getSortColumns();
+    if (sortColumnList.isEmpty()) {
+      return false;
+    }
+    // table sort_columns
+    String sortColumns = Strings.mkString(
+        sortColumnList.toArray(new String[sortColumnList.size()]), ",");
+    String segmentPath =
+        CarbonTablePath.getSegmentPath(table.getTablePath(), load.getLoadName());
+    // segment sort_columns
+    String segmentSortColumns = getSortColumnsOfSegment(segmentPath);
+    if (segmentSortColumns == null) {
+      return false;
+    } else {
+      return segmentSortColumns.equalsIgnoreCase(sortColumns);
+    }
+  }
+
+  private static String mkSortColumnsString(
+      List<org.apache.carbondata.format.ColumnSchema> columnList) {
+    StringBuilder builder = new StringBuilder();
+    for (org.apache.carbondata.format.ColumnSchema column : columnList) {
+      if (column.isDimension()) {
+        Map<String, String> properties = column.getColumnProperties();
+        if (properties != null) {
+          if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+            builder.append(column.column_name).append(",");
+          }
         }
       }
-      if (sortColumnsOfSegment.size() < table.getNumberOfSortColumns()) {
-        return false;
-      }
-      List<CarbonDimension> sortColumnsOfTable = new ArrayList<>();
-      for (CarbonDimension dimension : table.getDimensions()) {
-        if (dimension.isSortColumn()) {
-          sortColumnsOfTable.add(dimension);
+    }
+    if (builder.length() > 1) {
+      return builder.substring(0, builder.length() - 1);
+    } else {
+      return null;
+    }
+  }
+
+  public static String getSortColumnsOfSegment(String segmentFolder) {
+    CarbonFile[] files = SegmentIndexFileStore.getCarbonIndexFiles(
+        segmentFolder, FileFactory.getConfiguration());
+    Set<Boolean> isSortSet = new HashSet<>();
+    Set<String> sortColumnsSet = new HashSet<>();
+    if (files != null) {
+      for (CarbonFile file : files) {
+        IndexHeader indexHeader = SegmentIndexFileStore.readIndexHeader(
+            file.getCanonicalPath(), FileFactory.getConfiguration());
+        if (indexHeader != null) {
+          if (indexHeader.isSetIs_sort()) {
+            isSortSet.add(indexHeader.is_sort);
+            if (indexHeader.is_sort) {
+              sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
+            }
+          } else {
+            // if is_sort is not set, it will be old store and consider as local_sort by default.
+            sortColumnsSet.add(mkSortColumnsString(indexHeader.getTable_columns()));
+          }
         }
-      }
-      int sortColumnNums = sortColumnsOfTable.size();
-      if (sortColumnsOfSegment.size() < sortColumnNums) {
-        return false;
-      }
-      // compare sort_columns
-      for (int i = 0; i < sortColumnNums; i++) {
-        if (!RestructureUtil
-            .isColumnMatches(table.isTransactionalTable(), sortColumnsOfTable.get(i),
-                sortColumnsOfSegment.get(i))) {
-          return false;
+        if (isSortSet.size() >= 2 || sortColumnsSet.size() >= 2) {
+          break;
         }
       }
-      return true;
+    }
+    // for all index files, sort_columns should be same
+    if (isSortSet.size() <= 1 && sortColumnsSet.size() == 1) {
+      return sortColumnsSet.iterator().next();
     } else {
-      return false;
+      return null;
     }
   }