You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/10/25 06:20:14 UTC

carbondata git commit: [CARBONDATA-3040][BloomDataMap] Fix bug for merging bloom index

Repository: carbondata
Updated Branches:
  refs/heads/master e4806b9a0 -> 33a6dc2ac


[CARBONDATA-3040][BloomDataMap] Fix bug for merging bloom index

Problem
There is a bug which causes query failure when we create two bloom datamaps on same table with data.

Analyze
Since we already have data, each create datamap will trigger rebuild datamap task and then trigger bloom index file merging. By debuging, we found the first datamap's bloom index files would be merged two times and the second time made bloom index file empty.

The procedure goes as below:

  1. create table
  2. load data
  3. create bloom datamap1: rebuild datamap1 for existing data, event listener is trigger to merge index files for all bloom datamaps( currently only datamap1 )
  4. create bloom datamap2: rebuild datamap2 for existing data, event listener is trigger to merge index files for all bloom datamaps (currently datamap1 and datamap2)

Because the event does not has information which datamap it rebuilt, it always rebuilds all bloom datamap. So datamap1's bloom index files would be merged 2 times, but only remains a mergeShard folder when it ran the second merged such that no file input for merging and the final merge bloom index files are empty.

Solution
Send the datamap name in rebuild event for filter and only merge bloom index files for the specific datamap. Also add file check whether mergeShard already exists before merging.

This closes #2851


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/33a6dc2a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/33a6dc2a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/33a6dc2a

Branch: refs/heads/master
Commit: 33a6dc2ac996cbb0bfb4f354d7fc80b297d652bb
Parents: e4806b9
Author: Manhua <ke...@qq.com>
Authored: Wed Oct 24 16:20:13 2018 +0800
Committer: xuchuanyin <xu...@hust.edu.cn>
Committed: Thu Oct 25 14:17:29 2018 +0800

----------------------------------------------------------------------
 .../datamap/bloom/BloomIndexFileStore.java      | 20 +++++++++++++++-----
 .../carbondata/events/DataMapEvents.scala       | 10 +++++++++-
 .../datamap/IndexDataMapRebuildRDD.scala        |  2 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  2 +-
 .../events/MergeBloomIndexEventListener.scala   | 10 ++++++++--
 .../management/CarbonLoadDataCommand.scala      |  2 +-
 6 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
index 17813ba..3d6ad9b 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
@@ -60,6 +60,9 @@ public class BloomIndexFileStore {
 
 
   public static void mergeBloomIndexFile(String dmSegmentPathString, List<String> indexCols) {
+
+    // Step 1. check current folders
+
     // get all shard paths of old store
     CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
             FileFactory.getFileType(dmSegmentPathString));
@@ -72,6 +75,9 @@ public class BloomIndexFileStore {
 
     String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME;
     String mergeInprogressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE;
+
+    // Step 2. prepare for fail-safe merging
+
     try {
       // delete mergeShard folder if exists
       if (FileFactory.isFileExist(mergeShardPath)) {
@@ -87,10 +93,12 @@ public class BloomIndexFileStore {
         throw new RuntimeException("Failed to create directory " + mergeShardPath);
       }
     } catch (IOException e) {
-      LOGGER.error("Error occurs while create directory " + mergeShardPath, e);
-      throw new RuntimeException("Error occurs while create directory " + mergeShardPath);
+      throw new RuntimeException(e);
     }
 
+    // Step 3. merge index files
+    // Query won't use mergeShard until MERGE_INPROGRESS_FILE is deleted
+
     // for each index column, merge the bloomindex files from all shards into one
     for (String indexCol: indexCols) {
       String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, indexCol);
@@ -115,15 +123,17 @@ public class BloomIndexFileStore {
         }
       } catch (IOException e) {
         LOGGER.error("Error occurs while merge bloom index file of column: " + indexCol, e);
-        // delete merge shard of bloom index for this segment when failed
+        // if any column failed, delete merge shard for this segment and exit
         FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeInprogressFile));
         throw new RuntimeException(
-            "Error occurs while merge bloom index file of column: " + indexCol);
+            "Error occurs while merge bloom index file of column: " + indexCol, e);
       } finally {
         CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
       }
     }
-    // delete flag file and mergeShard can be used
+
+    // Step 4. delete flag file and mergeShard can be used
     try {
       FileFactory.deleteFile(mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile));
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
index 503729a..06da3d3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
@@ -58,9 +58,17 @@ case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession,
 /**
  * For handling operation's after finish of index build over table with index datamap
  * example: bloom datamap, Lucene datamap
+ *
+ * @param sparkSession
+ * @param identifier
+ * @param dmName set to specify datamap name in rebuild process;
+ *               set to Null in loading and compaction and it will deal all datamaps
+ * @param segmentIdList
+ * @param isFromRebuild set to false in loading process for skipping lazy datamap
  */
 case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession,
-    identifier: AbsoluteTableIdentifier, segmentIdList: Seq[String], isFromRebuild: Boolean)
+    identifier: AbsoluteTableIdentifier, dmName: String,
+    segmentIdList: Seq[String], isFromRebuild: Boolean)
   extends Event with TableEventInfo
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 3f486d0..a35de58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -133,7 +133,7 @@ object IndexDataMapRebuildRDD {
     }
 
     val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
-      tableIdentifier, validSegments.asScala.map(_.getSegmentNo), true)
+      tableIdentifier, schema.getDataMapName, validSegments.asScala.map(_.getSegmentNo), true)
     OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 756d30c..ac83212 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -289,7 +289,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       if (null != tableDataMaps) {
         val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(
           sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
-          Seq(carbonLoadModel.getSegmentId), true)
+          null, Seq(mergedLoadNumber), true)
         OperationListenerBus.getInstance()
           .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
index 2d4fe84..ea0f9e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -48,8 +48,14 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging {
           _.getDataMapSchema.getProviderName.equalsIgnoreCase(
             DataMapClassProvider.BLOOMFILTER.getShortName))
 
-        // for load process, filter lazy datamap
-        if (!datamapPostEvent.isFromRebuild) {
+        if (datamapPostEvent.isFromRebuild) {
+          if (null != datamapPostEvent.dmName) {
+            // for rebuild process
+            bloomDatamaps = bloomDatamaps.filter(
+              _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
+          }
+        } else {
+          // for load process, skip lazy datamap
           bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
         }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f8077ae..29cc6a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -332,7 +332,7 @@ case class CarbonLoadDataCommand(
         OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
         if (tableDataMaps.size() > 0) {
           val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
-            table.getAbsoluteTableIdentifier, Seq(carbonLoadModel.getSegmentId), false)
+            table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
           OperationListenerBus.getInstance()
             .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
         }