You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:13 UTC
[26/50] [abbrv] incubator-carbondata git commit: [Bug] Executor Btree
loading performance improvement (#763)
[Bug] Executor Btree loading performance improvement (#763)
Make block level btree loading in executor multi threaded.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e6890929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e6890929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e6890929
Branch: refs/heads/master
Commit: e689092960045634ca3531918022d5030274eb1c
Parents: c6cafaa
Author: Kumar Vishal <ku...@gmail.com>
Authored: Tue Jun 28 00:09:23 2016 +0800
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jun 27 21:39:23 2016 +0530
----------------------------------------------------------------------
.../core/carbon/datastore/BlockIndexStore.java | 184 +++++++++++++------
1 file changed, 131 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6890929/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
index c24e6a9..7ef7a24 100644
--- a/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -23,7 +23,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
@@ -34,6 +40,7 @@ import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonProperties;
import org.carbondata.core.util.CarbonUtil;
import org.carbondata.core.util.CarbonUtilException;
@@ -99,16 +106,24 @@ public class BlockIndexStore {
*/
public List<AbstractIndex> loadAndGetBlocks(List<TableBlockInfo> tableBlocksInfos,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
- List<AbstractIndex> loadedBlocksList =
- new ArrayList<AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
addTableLockObject(absoluteTableIdentifier);
- // sort the block infos
+ // sort the block info
// so block will be loaded in sorted order this will be required for
// query execution
Collections.sort(tableBlocksInfos);
// get the instance
Object lockObject = tableLockMap.get(absoluteTableIdentifier);
Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp = null;
+ int numberOfCores = 1;
+ try {
+ numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUM_CORES,
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+ } catch (NumberFormatException e) {
+ numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
// Acquire the lock to ensure only one query is loading the table blocks
// if same block is assigned to both the queries
synchronized (lockObject) {
@@ -120,63 +135,102 @@ public class BlockIndexStore {
}
}
AbstractIndex tableBlock = null;
- DataFileFooter footer = null;
- try {
- for (TableBlockInfo blockInfo : tableBlocksInfos) {
- // if table block is already loaded then do not load
- // that block
- tableBlock = tableBlockMapTemp.get(blockInfo);
- // if block is not loaded
- if (null == tableBlock) {
- // check any lock object is present in
- // block info lock map
- Object blockInfoLockObject = blockInfoLock.get(blockInfo);
- // if lock object is not present then acquire
- // the lock in block info lock and add a lock object in the map for
- // particular block info, added double checking mechanism to add the lock
- // object so in case of concurrent query we for same block info only one lock
- // object will be added
- if (null == blockInfoLockObject) {
- synchronized (blockInfoLock) {
- // again checking the block info lock, to check whether lock object is present
- // or not if now also not present then add a lock object
- blockInfoLockObject = blockInfoLock.get(blockInfo);
- if (null == blockInfoLockObject) {
- blockInfoLockObject = new Object();
- blockInfoLock.put(blockInfo, blockInfoLockObject);
- }
+ List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
+ int counter = -1;
+ for (TableBlockInfo blockInfo : tableBlocksInfos) {
+ counter++;
+ // if table block is already loaded then do not load
+ // that block
+ tableBlock = tableBlockMapTemp.get(blockInfo);
+ // if block is not loaded
+ if (null == tableBlock) {
+ // check any lock object is present in
+ // block info lock map
+ Object blockInfoLockObject = blockInfoLock.get(blockInfo);
+ // if lock object is not present then acquire
+ // the lock in block info lock and add a lock object in the map for
+ // particular block info, added double checking mechanism to add the lock
+ // object so in case of concurrent query we for same block info only one lock
+ // object will be added
+ if (null == blockInfoLockObject) {
+ synchronized (blockInfoLock) {
+ // again checking the block info lock, to check whether lock object is present
+ // or not if now also not present then add a lock object
+ blockInfoLockObject = blockInfoLock.get(blockInfo);
+ if (null == blockInfoLockObject) {
+ blockInfoLockObject = new Object();
+ blockInfoLock.put(blockInfo, blockInfoLockObject);
}
}
- //acquire the lock for particular block info
- synchronized (blockInfoLockObject) {
- // check again whether block is present or not to avoid the
- // same block is loaded
- //more than once in case of concurrent query
- tableBlock = tableBlockMapTemp.get(blockInfo);
- // if still block is not present then load the block
- if (null == tableBlock) {
- // getting the data file meta data of the block
- footer = CarbonUtil
- .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
- blockInfo.getBlockLength());
- tableBlock = new BlockIndex();
- footer.setTableBlockInfo(blockInfo);
- // building the block
- tableBlock.buildIndex(Arrays.asList(footer));
- tableBlockMapTemp.put(blockInfo, tableBlock);
- // finally remove the lock object from block info lock as once block is loaded
- // it will not come inside this if condition
- blockInfoLock.remove(blockInfo);
- }
+ }
+ //acquire the lock for particular block info
+ synchronized (blockInfoLockObject) {
+ // check again whether block is present or not to avoid the
+ // same block is loaded
+ //more than once in case of concurrent query
+ tableBlock = tableBlockMapTemp.get(blockInfo);
+ // if still block is not present then load the block
+ if (null == tableBlock) {
+ blocksList.add(executor.submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
}
}
- loadedBlocksList.add(tableBlock);
+ } else {
+ // if blocks is already loaded then directly set the block at particular position
+ //so block will be present in sorted order
+ loadedBlock[counter] = tableBlock;
}
- } catch (CarbonUtilException e) {
- LOGGER.error("Problem while loading the block");
+ }
+ // shutdown the executor gracefully and wait until all the task is finished
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
throw new IndexBuilderException(e);
}
- return loadedBlocksList;
+ // fill the block which were not loaded before to loaded blocks array
+ fillLoadedBlocks(loadedBlock, blocksList);
+ return Arrays.asList(loadedBlock);
+ }
+
+ /**
+ * Below method will be used to fill the loaded blocks to the array
+ * which will be used for query execution
+ *
+ * @param loadedBlockArray array of blocks which will be filled
+ * @param blocksList blocks loaded in thread
+ * @throws IndexBuilderException in case of any failure
+ */
+ private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
+ List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
+ int blockCounter = 0;
+ for (int i = 0; i < loadedBlockArray.length; i++) {
+ if (null == loadedBlockArray[i]) {
+ try {
+ loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IndexBuilderException(e);
+ }
+ }
+
+ }
+ }
+
+ private AbstractIndex loadBlock(Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp,
+ TableBlockInfo blockInfo) throws CarbonUtilException {
+ AbstractIndex tableBlock;
+ DataFileFooter footer;
+ // getting the data file meta data of the block
+ footer = CarbonUtil.readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+ blockInfo.getBlockLength());
+ tableBlock = new BlockIndex();
+ footer.setTableBlockInfo(blockInfo);
+ // building the block
+ tableBlock.buildIndex(Arrays.asList(footer));
+ tableBlockMapTemp.put(blockInfo, tableBlock);
+ // finally remove the lock object from block info lock as once block is loaded
+ // it will not come inside this if condition
+ blockInfoLock.remove(blockInfo);
+ return tableBlock;
}
/**
@@ -228,4 +282,28 @@ public class BlockIndexStore {
tableLockMap.remove(absoluteTableIdentifier);
tableBlocksMap.remove(absoluteTableIdentifier);
}
+
+ /**
+ * Thread class which will be used to load the blocks
+ */
+ private class BlockLoaderThread implements Callable<AbstractIndex> {
+ /**
+ * table block info to block index map
+ */
+ private Map<TableBlockInfo, AbstractIndex> tableBlockMap;
+
+ // block info
+ private TableBlockInfo blockInfo;
+
+ private BlockLoaderThread(TableBlockInfo blockInfo,
+ Map<TableBlockInfo, AbstractIndex> tableBlockMap) {
+ this.tableBlockMap = tableBlockMap;
+ this.blockInfo = blockInfo;
+ }
+
+ @Override public AbstractIndex call() throws Exception {
+ // load and return the loaded blocks
+ return loadBlock(tableBlockMap, blockInfo);
+ }
+ }
}