You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/01 06:35:21 UTC

carbondata git commit: [CARBONDATA-2092] Fix compaction bug to prevent the compaction flow from going through the restructure compaction flow

Repository: carbondata
Updated Branches:
  refs/heads/master f34ea5c70 -> 7ed144c53


[CARBONDATA-2092] Fix compaction bug to prevent the compaction flow from going through the restructure compaction flow

Problem and analysis:
During data load current schema timestamp is written to the carbondata fileHeader. This is used during compaction to decide whether the block is a restructured block or the block is according to the latest schema.
As the blocklet information is now stored in the index file, while laoding it in memory the carbondata file header is not read and due to this the schema timestamp is not getting set to the blocklet information. Due to this during compaction flow there is a mismatch on comparing the current schema time stamp with the timestamp stored in the block and the flow goes through the restructure compaction flow instead of normal compaction flow.

Impact:
Compaction performance degradation as restructure compaction flow involves sorting of data again.

Solution:
Modified code to fix compaction bug to prevent the compaction flow from going through the restructure compaction flow until and unless and restructure add or drop column operation has not been performed

This closes #1875


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7ed144c5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7ed144c5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7ed144c5

Branch: refs/heads/master
Commit: 7ed144c537b48353de1ee8bf710c884d555c01ce
Parents: f34ea5c
Author: manishgupta88 <to...@gmail.com>
Authored: Tue Jan 23 21:12:39 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Feb 1 12:05:01 2018 +0530

----------------------------------------------------------------------
 .../util/AbstractDataFileFooterConverter.java   |  4 ++++
 .../core/util/CarbonMetadataUtil.java           |  6 ++++-
 .../apache/carbondata/core/util/CarbonUtil.java | 24 +++++++++++++++++---
 .../core/util/CarbonMetadataUtilTest.java       |  3 ++-
 format/src/main/thrift/carbondata_index.thrift  |  1 +
 .../carbondata/hadoop/CarbonInputSplit.java     |  2 ++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  3 +++
 .../CarbonGetTableDetailComandTestCase.scala    |  5 ++--
 .../processing/merger/CarbonCompactionUtil.java | 11 ++++++++-
 .../store/writer/AbstractFactDataWriter.java    |  3 ++-
 10 files changed, 53 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 5ebf4cf..c7bc6aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -165,6 +165,10 @@ public abstract class AbstractDataFileFooterConverter {
         dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
         dataFileFooter.setSegmentInfo(segmentInfo);
         dataFileFooter.setVersionId(tableBlockInfo.getVersion());
+        // In case of old schema time stamp will not be found in the index header
+        if (readIndexHeader.isSetSchema_time_stamp()) {
+          dataFileFooter.setSchemaUpdatedTimeStamp(readIndexHeader.getSchema_time_stamp());
+        }
         if (readBlockIndexInfo.isSetBlocklet_info()) {
           List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
           BlockletInfo blockletInfo = new DataFileFooterConverterV3()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 0ca0df8..9880b4d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -234,10 +234,12 @@ public class CarbonMetadataUtil {
    *
    * @param columnCardinality cardinality of each column
    * @param columnSchemaList  list of column present in the table
+   * @param bucketNumber
+   * @param schemaTimeStamp current timestamp of schema
    * @return Index header object
    */
   public static IndexHeader getIndexHeader(int[] columnCardinality,
-      List<ColumnSchema> columnSchemaList, int bucketNumber) {
+      List<ColumnSchema> columnSchemaList, int bucketNumber, long schemaTimeStamp) {
     // create segment info object
     SegmentInfo segmentInfo = new SegmentInfo();
     // set the number of columns
@@ -254,6 +256,8 @@ public class CarbonMetadataUtil {
     indexHeader.setTable_columns(columnSchemaList);
     // set the bucket number
     indexHeader.setBucket_id(bucketNumber);
+    // set the current schema time stamp which will used for deciding the restructured block
+    indexHeader.setSchema_time_stamp(schemaTimeStamp);
     return indexHeader;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 600b1c9..e060c84 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -968,8 +968,27 @@ public final class CarbonUtil {
    * Below method will be used to read the data file matadata
    */
   public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
+    return getDataFileFooter(tableBlockInfo, false);
+  }
+
+  /**
+   * Below method will be used to read the data file matadata
+   *
+   * @param tableBlockInfo
+   * @param forceReadDataFileFooter flag to decide whether to read the footer of
+   *                                carbon data file forcefully
+   * @return
+   * @throws IOException
+   */
+  public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo,
+      boolean forceReadDataFileFooter) throws IOException {
+    return getDataFileFooter(tableBlockInfo, forceReadDataFileFooter);
+  }
+
+  private static DataFileFooter getDataFileFooter(TableBlockInfo tableBlockInfo,
+      boolean forceReadDataFileFooter) throws IOException {
     BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
-    if (detailInfo == null) {
+    if (detailInfo == null || forceReadDataFileFooter) {
       AbstractDataFileFooterConverter fileFooterConverter =
           DataFileFooterConverterFactory.getInstance()
               .getDataFileFooterConverter(tableBlockInfo.getVersion());
@@ -977,8 +996,7 @@ public final class CarbonUtil {
     } else {
       DataFileFooter fileFooter = new DataFileFooter();
       fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
-      ColumnarFormatVersion version =
-          ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+      ColumnarFormatVersion version = ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
       AbstractDataFileFooterConverter dataFileFooterConverter =
           DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
       List<ColumnSchema> schema = dataFileFooterConverter.getSchema(tableBlockInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 40463a6..da31ea3 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -168,7 +168,8 @@ public class CarbonMetadataUtilTest {
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     indexHeader.setBucket_id(0);
-    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList, 0);
+    indexHeader.setSchema_time_stamp(0L);
+    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList, 0, 0L);
     assertEquals(indexHeader, indexheaderResult);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index 60ec769..f77a256 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -31,6 +31,7 @@ struct IndexHeader{
   2: required list<schema.ColumnSchema> table_columns;	// Description of columns in this file
   3: required carbondata.SegmentInfo segment_info;	// Segment info (will be same/repeated for all files in this segment)
   4: optional i32 bucket_id; // Bucket number in which file contains
+  5: optional i64 schema_time_stamp; // Timestamp to compare column schema against master schema
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index c1ef14d..5ab6605 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -181,6 +181,7 @@ public class CarbonInputSplit extends FileSplit
                 split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
                 split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
+        blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
@@ -199,6 +200,7 @@ public class CarbonInputSplit extends FileSplit
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+      blockInfo.setBlockOffset(inputSplit.getDetailInfo().getBlockFooterOffset());
       return blockInfo;
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + inputSplit, e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c482a92..f37b0c5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -183,6 +183,7 @@ class CarbonMergerRDD[K, V](
           .checkIfAnyRestructuredBlockExists(segmentMapping,
             dataFileMetadataSegMapping,
             carbonTable.getTableLastUpdatedTime)
+        LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
           carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
@@ -215,6 +216,7 @@ class CarbonMergerRDD[K, V](
         carbonLoadModel.setPartitionId("0")
         var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
+          LOGGER.info("CompactionResultSortProcessor flow is selected")
           processor = new CompactionResultSortProcessor(
             carbonLoadModel,
             carbonTable,
@@ -223,6 +225,7 @@ class CarbonMergerRDD[K, V](
             factTableName,
             partitionNames)
         } else {
+          LOGGER.info("RowResultMergerProcessor flow is selected")
           processor =
             new RowResultMergerProcessor(
               databaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 2d90063..6265d0d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,9 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    assertResult(2136)(result(0).getLong(1))
+    // 2143 is the size of carbon table
+    assertResult(2143)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2136)(result(1).getLong(1))
+    assertResult(2143)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index d796262..2a69f0d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -119,7 +119,16 @@ public class CarbonCompactionUtil {
       DataFileFooter dataFileMatadata = null;
       // check if segId is already present in map
       List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
-      dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo);
+      // check to decide whether to read file footer of carbondata file forcefully. This will help
+      // in getting the schema last updated time based on which compaction flow is decided that
+      // whether it will go to restructure compaction flow or normal compaction flow.
+      // This decision will impact the compaction performance so it needs to be decided carefully
+      if (null != blockInfo.getDetailInfo()
+          && blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L) {
+        dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo, true);
+      } else {
+        dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo);
+      }
       if (null == metadataList) {
         // if it is not present
         eachSegmentBlocks.add(dataFileMatadata);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ed144c5/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index d1fc17b..c0b8065 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -417,7 +417,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected void writeIndexFile() throws IOException, CarbonDataWriterException {
     // get the header
     IndexHeader indexHeader = CarbonMetadataUtil
-        .getIndexHeader(localCardinality, thriftColumnSchemaList, model.getBucketId());
+        .getIndexHeader(localCardinality, thriftColumnSchemaList, model.getBucketId(),
+            model.getSchemaUpdatedTimeStamp());
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     // randomly choose a temp location for index file