You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/17 14:50:18 UTC
carbondata git commit: [CARBONDATA-3118] support parallel block
pruning for non-default datamaps
Repository: carbondata
Updated Branches:
refs/heads/master 8fd449c89 -> 38abb6bd9
[CARBONDATA-3118] support parallel block pruning for non-default datamaps
Changes in this PR:
BlockDatamap and blockletDatamap can store multiple files information. Each file is one row in that datamap. But non-default datamaps are not like that, so default datamaps distribution in multithread happens based on number of entries in datamaps, for non-default datamps distribution is based on number of datamaps (one datamap is considered as one record for non-default datamaps)
so, supported block pruning in multi-thread for non-default datamaps in this PR.
This closes #2949
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/38abb6bd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/38abb6bd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/38abb6bd
Branch: refs/heads/master
Commit: 38abb6bd9c1dd758391d50361b83135db3866ad2
Parents: 8fd449c
Author: ajantha-bhat <aj...@gmail.com>
Authored: Fri Nov 23 18:52:07 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Dec 17 20:20:07 2018 +0530
----------------------------------------------------------------------
.../carbondata/core/datamap/TableDataMap.java | 55 ++++++++++++--------
.../carbondata/core/datamap/dev/DataMap.java | 7 +++
.../dev/cgdatamap/CoarseGrainDataMap.java | 5 ++
.../datamap/dev/fgdatamap/FineGrainDataMap.java | 5 ++
.../indexstore/blockletindex/BlockDataMap.java | 14 +++++
.../datamap/examples/MinMaxIndexDataMap.java | 4 ++
.../testsuite/datamap/CGDataMapTestCase.scala | 2 +
.../testsuite/datamap/FGDataMapTestCase.scala | 2 +
8 files changed, 71 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 06d2cab..4de7449 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -41,7 +41,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -145,23 +144,11 @@ public final class TableDataMap extends OperationEventListener {
// for filter queries
int totalFiles = 0;
int datamapsCount = 0;
- int filesCountPerDatamap;
- boolean isBlockDataMapType = true;
for (Segment segment : segments) {
for (DataMap dataMap : dataMaps.get(segment)) {
- if (!(dataMap instanceof BlockDataMap)) {
- isBlockDataMapType = false;
- break;
- }
- filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
- // old legacy store can give 0, so consider one datamap as 1 record.
- totalFiles += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
+ totalFiles += dataMap.getNumberOfEntries();
datamapsCount++;
}
- if (!isBlockDataMapType) {
- // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, bloom datamap. use old flow.
- break;
- }
}
int numOfThreadsForPruning = getNumOfThreadsForPruning();
if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning || totalFiles
@@ -208,6 +195,30 @@ public final class TableDataMap extends OperationEventListener {
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
+ /*
+ *********************************************************************************
+ * Below is the example of how this part of code works.
+ * consider a scenario of having 5 segments, 10 datamaps in each segment,
+ * and each datamap has one record. So total 50 records.
+ *
+ * Datamaps in each segment looks like below.
+ * s0 [0-9], s1 [0-9], s2 [0-9], s3[0-9], s4[0-9]
+ *
+ * If number of threads are 4. so filesPerEachThread = 50/4 = 12 files per each thread.
+ *
+ * SegmentDataMapGroup look like below: [SegmentId, fromIndex, toIndex]
+ * In each segment only those datamaps are processed between fromIndex and toIndex.
+ *
+ * Final result will be: (4 list created as numOfThreadsForPruning is 4)
+ * Thread1 list: s0 [0-9], s1 [0-1] : 12 files
+ * Thread2 list: s1 [2-9], s2 [0-3] : 12 files
+ * Thread3 list: s2 [4-9], s3 [0-5] : 12 files
+ * Thread4 list: s3 [6-9], s4 [0-9] : 14 files
+ * so each thread will process almost equal number of records.
+ *
+ *********************************************************************************
+ */
+
int numOfThreadsForPruning = getNumOfThreadsForPruning();
LOG.info(
"Number of threads selected for multi-thread block pruning is " + numOfThreadsForPruning
@@ -216,24 +227,22 @@ public final class TableDataMap extends OperationEventListener {
int prev;
int filesCount = 0;
int processedFileCount = 0;
- int filesCountPerDatamap;
- List<List<SegmentDataMapGroup>> segmentList = new ArrayList<>(numOfThreadsForPruning);
+ List<List<SegmentDataMapGroup>> datamapListForEachThread =
+ new ArrayList<>(numOfThreadsForPruning);
List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>();
for (Segment segment : segments) {
List<DataMap> eachSegmentDataMapList = dataMaps.get(segment);
prev = 0;
for (int i = 0; i < eachSegmentDataMapList.size(); i++) {
DataMap dataMap = eachSegmentDataMapList.get(i);
- filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
- // old legacy store can give 0, so consider one datamap as 1 record.
- filesCount += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
+ filesCount += dataMap.getNumberOfEntries();
if (filesCount >= filesPerEachThread) {
- if (segmentList.size() != numOfThreadsForPruning - 1) {
+ if (datamapListForEachThread.size() != numOfThreadsForPruning - 1) {
// not the last segmentList
segmentDataMapGroupList.add(new SegmentDataMapGroup(segment, prev, i));
// save the last value to process in next thread
prev = i + 1;
- segmentList.add(segmentDataMapGroupList);
+ datamapListForEachThread.add(segmentDataMapGroupList);
segmentDataMapGroupList = new ArrayList<>();
processedFileCount += filesCount;
filesCount = 0;
@@ -252,7 +261,7 @@ public final class TableDataMap extends OperationEventListener {
}
}
// adding the last segmentList data
- segmentList.add(segmentDataMapGroupList);
+ datamapListForEachThread.add(segmentDataMapGroupList);
processedFileCount += filesCount;
if (processedFileCount != totalFiles) {
// this should not happen
@@ -263,7 +272,7 @@ public final class TableDataMap extends OperationEventListener {
final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
final String threadName = Thread.currentThread().getName();
for (int i = 0; i < numOfThreadsForPruning; i++) {
- final List<SegmentDataMapGroup> segmentDataMapGroups = segmentList.get(i);
+ final List<SegmentDataMapGroup> segmentDataMapGroups = datamapListForEachThread.get(i);
results.add(executorService.submit(new Callable<Void>() {
@Override public Void call() throws IOException {
Thread.currentThread().setName(threadName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 47eeafe..f31b7f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -70,4 +70,11 @@ public interface DataMap<T extends Blocklet> {
*/
void finish();
+ /**
+ * Returns number of records information that are stored in datamap.
+ * Driver multi-thread block pruning happens based on the number of rows in datamap.
+ * So datamaps can have multiple rows if they store information of multiple files.
+ * so, this number of entries is used to represent how many files information a datamap contains
+ */
+ int getNumberOfEntries();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index 25c4c94..fc1f104 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -40,4 +40,9 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
throw new UnsupportedOperationException("Filter expression not supported");
}
+
+ @Override public int getNumberOfEntries() {
+ // keep default, one record in one datamap
+ return 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index 7431742..a6732a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -39,4 +39,9 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
throw new UnsupportedOperationException("Filter expression not supported");
}
+
+ @Override public int getNumberOfEntries() {
+ // keep default, one record in one datamap
+ return 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 6ba0507..6b04cf7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -1039,4 +1039,18 @@ public class BlockDataMap extends CoarseGrainDataMap
public int getSegmentPropertiesIndex() {
return segmentPropertiesIndex;
}
+
+ @Override public int getNumberOfEntries() {
+ if (memoryDMStore != null) {
+ if (memoryDMStore.getRowCount() == 0) {
+ // so that one datamap considered as one record
+ return 1;
+ } else {
+ return memoryDMStore.getRowCount();
+ }
+ } else {
+ // legacy store
+ return 1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index 510d87c..fb9c03e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -175,4 +175,8 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
}
+ @Override public int getNumberOfEntries() {
+ // keep default, one record in one datamap
+ return 1;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 58f2542..a6bc30d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -251,6 +251,8 @@ class CGDataMap extends CoarseGrainDataMap {
override def finish() = {
???
}
+
+ override def getNumberOfEntries: Int = 1
}
class CGDataMapWriter(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/38abb6bd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index f553a9d..99e0509 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -274,6 +274,8 @@ class FGDataMap extends FineGrainDataMap {
override def finish() = {
}
+
+ override def getNumberOfEntries: Int = 1
}
class FGDataMapWriter(carbonTable: CarbonTable,