You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/17 14:50:18 UTC

carbondata git commit: [CARBONDATA-3118] support parallel block pruning for non-default datamaps

Repository: carbondata
Updated Branches:
  refs/heads/master 8fd449c89 -> 38abb6bd9


[CARBONDATA-3118] support parallel block pruning for non-default datamaps

Changes in this PR:
BlockDatamap and blockletDatamap can store multiple files information. Each file is one row in that datamap. But non-default datamaps are not like that, so default datamaps distribution in multithread happens based on number of entries in datamaps, for non-default datamps distribution is based on number of datamaps (one datamap is considered as one record for non-default datamaps)

so, supported block pruning in multi-thread for non-default datamaps in this PR.

This closes #2949


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/38abb6bd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/38abb6bd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/38abb6bd

Branch: refs/heads/master
Commit: 38abb6bd9c1dd758391d50361b83135db3866ad2
Parents: 8fd449c
Author: ajantha-bhat <aj...@gmail.com>
Authored: Fri Nov 23 18:52:07 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Dec 17 20:20:07 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   | 55 ++++++++++++--------
 .../carbondata/core/datamap/dev/DataMap.java    |  7 +++
 .../dev/cgdatamap/CoarseGrainDataMap.java       |  5 ++
 .../datamap/dev/fgdatamap/FineGrainDataMap.java |  5 ++
 .../indexstore/blockletindex/BlockDataMap.java  | 14 +++++
 .../datamap/examples/MinMaxIndexDataMap.java    |  4 ++
 .../testsuite/datamap/CGDataMapTestCase.scala   |  2 +
 .../testsuite/datamap/FGDataMapTestCase.scala   |  2 +
 8 files changed, 71 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 06d2cab..4de7449 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -41,7 +41,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -145,23 +144,11 @@ public final class TableDataMap extends OperationEventListener {
     // for filter queries
     int totalFiles = 0;
     int datamapsCount = 0;
-    int filesCountPerDatamap;
-    boolean isBlockDataMapType = true;
     for (Segment segment : segments) {
       for (DataMap dataMap : dataMaps.get(segment)) {
-        if (!(dataMap instanceof BlockDataMap)) {
-          isBlockDataMapType = false;
-          break;
-        }
-        filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
-        // old legacy store can give 0, so consider one datamap as 1 record.
-        totalFiles += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
+        totalFiles += dataMap.getNumberOfEntries();
         datamapsCount++;
       }
-      if (!isBlockDataMapType) {
-        // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, bloom datamap. use old flow.
-        break;
-      }
     }
     int numOfThreadsForPruning = getNumOfThreadsForPruning();
     if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning || totalFiles
@@ -208,6 +195,30 @@ public final class TableDataMap extends OperationEventListener {
       final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
       List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
       int totalFiles) {
+    /*
+     *********************************************************************************
+     * Below is the example of how this part of code works.
+     * consider a scenario of having 5 segments, 10 datamaps in each segment,
+     * and each datamap has one record. So total 50 records.
+     *
+     * Datamaps in each segment looks like below.
+     * s0 [0-9], s1 [0-9], s2 [0-9], s3[0-9], s4[0-9]
+     *
+     * If number of threads are 4. so filesPerEachThread = 50/4 = 12 files per each thread.
+     *
+     * SegmentDataMapGroup look like below: [SegmentId, fromIndex, toIndex]
+     * In each segment only those datamaps are processed between fromIndex and toIndex.
+     *
+     * Final result will be: (4 list created as numOfThreadsForPruning is 4)
+     * Thread1 list: s0 [0-9], s1 [0-1]  : 12 files
+     * Thread2 list: s1 [2-9], s2 [0-3]  : 12 files
+     * Thread3 list: s2 [4-9], s3 [0-5]  : 12 files
+     * Thread4 list: s3 [6-9], s4 [0-9]  : 14 files
+     * so each thread will process almost equal number of records.
+     *
+     *********************************************************************************
+     */
+
     int numOfThreadsForPruning = getNumOfThreadsForPruning();
     LOG.info(
         "Number of threads selected for multi-thread block pruning is " + numOfThreadsForPruning
@@ -216,24 +227,22 @@ public final class TableDataMap extends OperationEventListener {
     int prev;
     int filesCount = 0;
     int processedFileCount = 0;
-    int filesCountPerDatamap;
-    List<List<SegmentDataMapGroup>> segmentList = new ArrayList<>(numOfThreadsForPruning);
+    List<List<SegmentDataMapGroup>> datamapListForEachThread =
+        new ArrayList<>(numOfThreadsForPruning);
     List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>();
     for (Segment segment : segments) {
       List<DataMap> eachSegmentDataMapList = dataMaps.get(segment);
       prev = 0;
       for (int i = 0; i < eachSegmentDataMapList.size(); i++) {
         DataMap dataMap = eachSegmentDataMapList.get(i);
-        filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
-        // old legacy store can give 0, so consider one datamap as 1 record.
-        filesCount += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
+        filesCount += dataMap.getNumberOfEntries();
         if (filesCount >= filesPerEachThread) {
-          if (segmentList.size() != numOfThreadsForPruning - 1) {
+          if (datamapListForEachThread.size() != numOfThreadsForPruning - 1) {
             // not the last segmentList
             segmentDataMapGroupList.add(new SegmentDataMapGroup(segment, prev, i));
             // save the last value to process in next thread
             prev = i + 1;
-            segmentList.add(segmentDataMapGroupList);
+            datamapListForEachThread.add(segmentDataMapGroupList);
             segmentDataMapGroupList = new ArrayList<>();
             processedFileCount += filesCount;
             filesCount = 0;
@@ -252,7 +261,7 @@ public final class TableDataMap extends OperationEventListener {
       }
     }
     // adding the last segmentList data
-    segmentList.add(segmentDataMapGroupList);
+    datamapListForEachThread.add(segmentDataMapGroupList);
     processedFileCount += filesCount;
     if (processedFileCount != totalFiles) {
       // this should not happen
@@ -263,7 +272,7 @@ public final class TableDataMap extends OperationEventListener {
     final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
     final String threadName = Thread.currentThread().getName();
     for (int i = 0; i < numOfThreadsForPruning; i++) {
-      final List<SegmentDataMapGroup> segmentDataMapGroups = segmentList.get(i);
+      final List<SegmentDataMapGroup> segmentDataMapGroups = datamapListForEachThread.get(i);
       results.add(executorService.submit(new Callable<Void>() {
         @Override public Void call() throws IOException {
           Thread.currentThread().setName(threadName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 47eeafe..f31b7f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -70,4 +70,11 @@ public interface DataMap<T extends Blocklet> {
    */
   void finish();
 
+  /**
+   * Returns number of records information that are stored in datamap.
+   * Driver multi-thread block pruning happens based on the number of rows in datamap.
+   * So datamaps can have multiple rows if they store information of multiple files.
+   * so, this number of entries is used to represent how many files information a datamap contains
+   */
+  int getNumberOfEntries();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index 25c4c94..fc1f104 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -40,4 +40,9 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
       List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
+
+  @Override public int getNumberOfEntries() {
+    // keep default, one record in one datamap
+    return 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index 7431742..a6732a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -39,4 +39,9 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
       List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
+
+  @Override public int getNumberOfEntries() {
+    // keep default, one record in one datamap
+    return 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 6ba0507..6b04cf7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -1039,4 +1039,18 @@ public class BlockDataMap extends CoarseGrainDataMap
   public int getSegmentPropertiesIndex() {
     return segmentPropertiesIndex;
   }
+
+  @Override public int getNumberOfEntries() {
+    if (memoryDMStore != null) {
+      if (memoryDMStore.getRowCount() == 0) {
+        // so that one datamap considered as one record
+        return 1;
+      } else {
+        return memoryDMStore.getRowCount();
+      }
+    } else {
+      // legacy store
+      return 1;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index 510d87c..fb9c03e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -175,4 +175,8 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
 
   }
 
+  @Override public int getNumberOfEntries() {
+    // keep default, one record in one datamap
+    return 1;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 58f2542..a6bc30d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -251,6 +251,8 @@ class CGDataMap extends CoarseGrainDataMap {
   override def finish() = {
     ???
   }
+
+  override def getNumberOfEntries: Int = 1
 }
 
 class CGDataMapWriter(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index f553a9d..99e0509 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -274,6 +274,8 @@ class FGDataMap extends FineGrainDataMap {
   override def finish() = {
 
   }
+
+  override def getNumberOfEntries: Int = 1
 }
 
 class FGDataMapWriter(carbonTable: CarbonTable,