You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:13 UTC

[26/50] [abbrv] incubator-carbondata git commit: [Bug] Executor Btree loading performance improvement (#763)

[Bug] Executor Btree loading performance improvement (#763)

Make block level btree loading in executor multi threaded.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e6890929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e6890929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e6890929

Branch: refs/heads/master
Commit: e689092960045634ca3531918022d5030274eb1c
Parents: c6cafaa
Author: Kumar Vishal <ku...@gmail.com>
Authored: Tue Jun 28 00:09:23 2016 +0800
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jun 27 21:39:23 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  | 184 +++++++++++++------
 1 file changed, 131 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6890929/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
index c24e6a9..7ef7a24 100644
--- a/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -23,7 +23,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
@@ -34,6 +40,7 @@ import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonProperties;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.util.CarbonUtilException;
 
@@ -99,16 +106,24 @@ public class BlockIndexStore {
    */
   public List<AbstractIndex> loadAndGetBlocks(List<TableBlockInfo> tableBlocksInfos,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
-    List<AbstractIndex> loadedBlocksList =
-        new ArrayList<AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
     addTableLockObject(absoluteTableIdentifier);
-    // sort the block infos
+    // sort the block info
     // so block will be loaded in sorted order this will be required for
     // query execution
     Collections.sort(tableBlocksInfos);
     // get the instance
     Object lockObject = tableLockMap.get(absoluteTableIdentifier);
     Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp = null;
+    int numberOfCores = 1;
+    try {
+      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+    } catch (NumberFormatException e) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
     // Acquire the lock to ensure only one query is loading the table blocks
     // if same block is assigned to both the queries
     synchronized (lockObject) {
@@ -120,63 +135,102 @@ public class BlockIndexStore {
       }
     }
     AbstractIndex tableBlock = null;
-    DataFileFooter footer = null;
-    try {
-      for (TableBlockInfo blockInfo : tableBlocksInfos) {
-        // if table block is already loaded then do not load
-        // that block
-        tableBlock = tableBlockMapTemp.get(blockInfo);
-        // if block is not loaded
-        if (null == tableBlock) {
-          // check any lock object is present in
-          // block info lock map
-          Object blockInfoLockObject = blockInfoLock.get(blockInfo);
-          // if lock object is not present then acquire
-          // the lock in block info lock and add a lock object in the map for
-          // particular block info, added double checking mechanism to add the lock
-          // object so in case of concurrent query we for same block info only one lock
-          // object will be added
-          if (null == blockInfoLockObject) {
-            synchronized (blockInfoLock) {
-              // again checking the block info lock, to check whether lock object is present
-              // or not if now also not present then add a lock object
-              blockInfoLockObject = blockInfoLock.get(blockInfo);
-              if (null == blockInfoLockObject) {
-                blockInfoLockObject = new Object();
-                blockInfoLock.put(blockInfo, blockInfoLockObject);
-              }
+    List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
+    int counter = -1;
+    for (TableBlockInfo blockInfo : tableBlocksInfos) {
+      counter++;
+      // if table block is already loaded then do not load
+      // that block
+      tableBlock = tableBlockMapTemp.get(blockInfo);
+      // if block is not loaded
+      if (null == tableBlock) {
+        // check any lock object is present in
+        // block info lock map
+        Object blockInfoLockObject = blockInfoLock.get(blockInfo);
+        // if lock object is not present then acquire
+        // the lock in block info lock and add a lock object in the map for
+        // particular block info, added double checking mechanism to add the lock
+        // object so in case of concurrent query we for same block info only one lock
+        // object will be added
+        if (null == blockInfoLockObject) {
+          synchronized (blockInfoLock) {
+            // again checking the block info lock, to check whether lock object is present
+            // or not if now also not present then add a lock object
+            blockInfoLockObject = blockInfoLock.get(blockInfo);
+            if (null == blockInfoLockObject) {
+              blockInfoLockObject = new Object();
+              blockInfoLock.put(blockInfo, blockInfoLockObject);
             }
           }
-          //acquire the lock for particular block info
-          synchronized (blockInfoLockObject) {
-            // check again whether block is present or not to avoid the
-            // same block is loaded
-            //more than once in case of concurrent query
-            tableBlock = tableBlockMapTemp.get(blockInfo);
-            // if still block is not present then load the block
-            if (null == tableBlock) {
-              // getting the data file meta data of the block
-              footer = CarbonUtil
-                  .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
-                      blockInfo.getBlockLength());
-              tableBlock = new BlockIndex();
-              footer.setTableBlockInfo(blockInfo);
-              // building the block
-              tableBlock.buildIndex(Arrays.asList(footer));
-              tableBlockMapTemp.put(blockInfo, tableBlock);
-              // finally remove the lock object from block info lock as once block is loaded
-              // it will not come inside this if condition
-              blockInfoLock.remove(blockInfo);
-            }
+        }
+        //acquire the lock for particular block info
+        synchronized (blockInfoLockObject) {
+          // check again whether block is present or not to avoid the
+          // same block is loaded
+          //more than once in case of concurrent query
+          tableBlock = tableBlockMapTemp.get(blockInfo);
+          // if still block is not present then load the block
+          if (null == tableBlock) {
+            blocksList.add(executor.submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
           }
         }
-        loadedBlocksList.add(tableBlock);
+      } else {
+        // if blocks is already loaded then directly set the block at particular position
+        //so block will be present in sorted order
+        loadedBlock[counter] = tableBlock;
       }
-    } catch (CarbonUtilException e) {
-      LOGGER.error("Problem while loading the block");
+    }
+    // shutdown the executor gracefully and wait until all the task is finished
+    executor.shutdown();
+    try {
+      executor.awaitTermination(1, TimeUnit.HOURS);
+    } catch (InterruptedException e) {
       throw new IndexBuilderException(e);
     }
-    return loadedBlocksList;
+    // fill the block which were not loaded before to loaded blocks array
+    fillLoadedBlocks(loadedBlock, blocksList);
+    return Arrays.asList(loadedBlock);
+  }
+
+  /**
+   * Below method will be used to fill the loaded blocks to the array
+   * which will be used for query execution
+   *
+   * @param loadedBlockArray array of blocks which will be filled
+   * @param blocksList       blocks loaded in thread
+   * @throws IndexBuilderException in case of any failure
+   */
+  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
+      List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
+    int blockCounter = 0;
+    for (int i = 0; i < loadedBlockArray.length; i++) {
+      if (null == loadedBlockArray[i]) {
+        try {
+          loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IndexBuilderException(e);
+        }
+      }
+
+    }
+  }
+
+  private AbstractIndex loadBlock(Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp,
+      TableBlockInfo blockInfo) throws CarbonUtilException {
+    AbstractIndex tableBlock;
+    DataFileFooter footer;
+    // getting the data file meta data of the block
+    footer = CarbonUtil.readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+        blockInfo.getBlockLength());
+    tableBlock = new BlockIndex();
+    footer.setTableBlockInfo(blockInfo);
+    // building the block
+    tableBlock.buildIndex(Arrays.asList(footer));
+    tableBlockMapTemp.put(blockInfo, tableBlock);
+    // finally remove the lock object from block info lock as once block is loaded
+    // it will not come inside this if condition
+    blockInfoLock.remove(blockInfo);
+    return tableBlock;
   }
 
   /**
@@ -228,4 +282,28 @@ public class BlockIndexStore {
     tableLockMap.remove(absoluteTableIdentifier);
     tableBlocksMap.remove(absoluteTableIdentifier);
   }
+
+  /**
+   * Thread class which will be used to load the blocks
+   */
+  private class BlockLoaderThread implements Callable<AbstractIndex> {
+    /**
+     * table block info to block index map
+     */
+    private Map<TableBlockInfo, AbstractIndex> tableBlockMap;
+
+    // block info
+    private TableBlockInfo blockInfo;
+
+    private BlockLoaderThread(TableBlockInfo blockInfo,
+        Map<TableBlockInfo, AbstractIndex> tableBlockMap) {
+      this.tableBlockMap = tableBlockMap;
+      this.blockInfo = blockInfo;
+    }
+
+    @Override public AbstractIndex call() throws Exception {
+      // load and return the loaded blocks
+      return loadBlock(tableBlockMap, blockInfo);
+    }
+  }
 }