You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/27 10:21:07 UTC
[1/2] carbondata git commit: [CARBONDATA-2407]Removed All Unused
Executor BTree code
Repository: carbondata
Updated Branches:
refs/heads/master 2f85381f8 -> 61788353d
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
deleted file mode 100644
index 2f28861..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.util;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.DataRefNode;
-import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * Block level traverser
- */
-public class BlockLevelTraverser {
-
- /**
- *
- * @param abstractIndex
- * @param blockRowMap
- * @param segId
- * @param updateStatusManager
- * @throws KeyGenException
- */
- public long getBlockRowMapping(AbstractIndex abstractIndex, Map<String, Long> blockRowMap,
- String segId, SegmentUpdateStatusManager updateStatusManager)
- throws KeyGenException {
-
- IndexKey searchStartKey =
- FilterUtil.prepareDefaultStartIndexKey(abstractIndex.getSegmentProperties());
-
- DataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(
- abstractIndex.getSegmentProperties().getEachDimColumnValueSize(),
- abstractIndex.getSegmentProperties().getNumberOfSortColumns(),
- abstractIndex.getSegmentProperties().getNumberOfNoDictSortColumns());
- DataRefNode currentBlock =
- blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), searchStartKey);
-
- long count = 0;
-
- while (currentBlock != null) {
-
- String blockName = ((BlockBTreeLeafNode) currentBlock).getTableBlockInfo().getFilePath();
- blockName = CarbonTablePath.getCarbonDataFileName(blockName);
- blockName = blockName + CarbonTablePath.getCarbonDataExtension();
-
- long rowCount = currentBlock.numRows();
-
- String key = CarbonUpdateUtil.getSegmentBlockNameKey(segId, blockName);
-
- // if block is invalid then dont add the count
- SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
-
- if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
- blockRowMap.put(key, rowCount);
- count++;
- }
- currentBlock = currentBlock.getNextDataRefNode();
- }
-
- return count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
deleted file mode 100644
index d30891a..0000000
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.carbon.datastore;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.BlockIndexStore;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.StoreCreator;
-
-import junit.framework.TestCase;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlockIndexStoreTest extends TestCase {
-
- // private BlockIndexStore indexStore;
- BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockIndexStoreTest.class.getName());
-
- @BeforeClass public void setUp() {
- StoreCreator.createCarbonStore();
- CarbonProperties.getInstance().
- addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");
- CacheProvider cacheProvider = CacheProvider.getInstance();
- cache = (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE);
- }
-
- @AfterClass public void tearDown() {
- }
-
- @Test public void testEmpty() {
-
- }
-
- private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>();
- for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
- tableBlockUniqueIdentifiers.add(new TableBlockUniqueIdentifier(absoluteTableIdentifier, tableBlockInfo));
- }
- return tableBlockUniqueIdentifiers;
- }
-
- private class BlockLoaderThread implements Callable<Void> {
- private List<TableBlockInfo> tableBlockInfoList;
- private AbsoluteTableIdentifier absoluteTableIdentifier;
-
- public BlockLoaderThread(List<TableBlockInfo> tableBlockInfoList,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- this.tableBlockInfoList = tableBlockInfoList;
- this.absoluteTableIdentifier = absoluteTableIdentifier;
- }
-
- @Override public Void call() throws Exception {
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifierList =
- getTableBlockUniqueIdentifierList(tableBlockInfoList, absoluteTableIdentifier);
- cache.getAll(tableBlockUniqueIdentifierList);
- return null;
- }
-
- }
-
- private static File getPartFile() {
- String path = StoreCreator.getIdentifier().getTablePath()
- + "/Fact/Part0/Segment_0";
- File file = new File(path);
- File[] files = file.listFiles();
- File part = null;
- for (int i = 0; i < files.length; i++) {
- if (files[i].getName().startsWith("part")) {
- part = files[i];
- }
- }
- return part;
- }
-
-}
[2/2] carbondata git commit: [CARBONDATA-2407]Removed All Unused
Executor BTree code
Posted by ja...@apache.org.
[CARBONDATA-2407]Removed All Unused Executor BTree code
This closes #2234
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/61788353
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/61788353
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/61788353
Branch: refs/heads/master
Commit: 61788353d85ef2d35a543efec0d771159a7198a1
Parents: 2f85381
Author: kumarvishal09 <ku...@gmail.com>
Authored: Thu Apr 26 22:30:32 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 27 18:20:56 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/cache/CacheProvider.java | 5 -
.../datastore/AbstractBlockIndexStoreCache.java | 111 ------
.../core/datastore/BlockIndexStore.java | 386 -------------------
.../core/datastore/block/BlockIndex.java | 50 ---
.../impl/btree/BlockletBTreeBuilder.java | 102 -----
.../impl/btree/BlockletBTreeLeafNode.java | 205 ----------
.../executor/impl/AbstractQueryExecutor.java | 54 +--
.../scan/filter/FilterExpressionProcessor.java | 84 ----
.../core/scan/filter/FilterProcessor.java | 14 -
.../core/cache/CacheProviderTest.java | 16 -
.../core/datastore/block/BlockIndexTest.java | 77 ----
.../carbondata/hadoop/internal/index/Index.java | 47 ---
.../hadoop/internal/index/IndexLoader.java | 33 --
.../internal/index/impl/InMemoryBTreeIndex.java | 222 -----------
.../index/impl/InMemoryBTreeIndexLoader.java | 29 --
.../hadoop/internal/segment/SegmentManager.java | 54 ---
.../internal/segment/SegmentManagerFactory.java | 28 --
.../internal/segment/impl/IndexedSegment.java | 71 ----
.../segment/impl/zk/ZkSegmentManager.java | 54 ---
.../hadoop/util/BlockLevelTraverser.java | 87 -----
.../carbon/datastore/BlockIndexStoreTest.java | 108 ------
21 files changed, 18 insertions(+), 1819 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index c3f8540..d29c087 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -27,10 +27,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache;
import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore;
import org.apache.carbondata.core.util.CarbonProperties;
@@ -117,8 +114,6 @@ public class CacheProvider {
} else if (cacheType.equals(CacheType.FORWARD_DICTIONARY)) {
cacheObject =
new ForwardDictionaryCache<DictionaryColumnUniqueIdentifier, Dictionary>(carbonLRUCache);
- } else if (cacheType.equals(cacheType.EXECUTOR_BTREE)) {
- cacheObject = new BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex>(carbonLRUCache);
} else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
cacheObject =
new SegmentTaskIndexStore(carbonLRUCache);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java b/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
deleted file mode 100644
index 3a62783..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.BlockInfo;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ObjectSizeCalculator;
-
-/**
- * This class validate and load the B-Tree in the executor lru cache
- * @param <K> cache key
- * @param <V> Block Meta data details
- */
-public abstract class AbstractBlockIndexStoreCache<K, V>
- implements Cache<TableBlockUniqueIdentifier, AbstractIndex> {
-
- /**
- * CarbonLRU cache
- */
- protected CarbonLRUCache lruCache;
-
- /**
- * table segment id vs blockInfo list
- */
- protected Map<String, List<BlockInfo>> segmentIdToBlockListMap;
-
-
- /**
- * map of block info to lock object map, while loading the btree this will be filled
- * and removed after loading the tree for that particular block info, this will be useful
- * while loading the tree concurrently so only block level lock will be applied another
- * block can be loaded concurrently
- */
- protected Map<BlockInfo, Object> blockInfoLock;
-
- /**
- * The object will hold the segment ID lock so that at a time only 1 block that belongs to same
- * segment & table can create the list for holding the block info
- */
- protected Map<String, Object> segmentIDLock;
-
- public AbstractBlockIndexStoreCache(CarbonLRUCache lruCache) {
- this.lruCache = lruCache;
- blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
- segmentIDLock = new ConcurrentHashMap<String, Object>();
- segmentIdToBlockListMap = new ConcurrentHashMap<>();
- }
-
- /**
- * This method will get the value for the given key. If value does not exist
- * for the given key, it will check and load the value.
- *
- * @param tableBlock
- * @param tableBlockUniqueIdentifier
- * @param lruCacheKey
- */
- protected void checkAndLoadTableBlocks(AbstractIndex tableBlock,
- TableBlockUniqueIdentifier tableBlockUniqueIdentifier, String lruCacheKey)
- throws IOException {
- // calculate the required size is
- TableBlockInfo blockInfo = tableBlockUniqueIdentifier.getTableBlockInfo();
- long requiredMetaSize = CarbonUtil.calculateMetaSize(blockInfo);
- if (requiredMetaSize > 0) {
- // load table blocks data
- // getting the data file meta data of the block
- DataFileFooter footer = CarbonUtil.readMetadatFile(blockInfo);
- footer.setBlockInfo(new BlockInfo(blockInfo));
- // building the block
- tableBlock.buildIndex(Collections.singletonList(footer));
- requiredMetaSize = ObjectSizeCalculator.estimate(blockInfo, requiredMetaSize);
- tableBlock.setMemorySize(requiredMetaSize);
- tableBlock.incrementAccessCount();
- boolean isTableBlockAddedToLruCache = lruCache.put(lruCacheKey, tableBlock, requiredMetaSize);
- if (!isTableBlockAddedToLruCache) {
- throw new IndexBuilderException(
- "Cannot load table blocks into memory. Not enough memory available");
- }
- } else {
- throw new IndexBuilderException(
- "Invalid carbon data file: " + blockInfo.getFilePath());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
deleted file mode 100644
index f2c38fa..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.BlockIndex;
-import org.apache.carbondata.core.datastore.block.BlockInfo;
-import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.TaskMetricsMap;
-
-/**
- * This class is used to load the B-Tree in Executor LRU Cache
- */
-public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
-
- /**
- * LOGGER instance
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockIndexStore.class.getName());
- public BlockIndexStore(CarbonLRUCache lruCache) {
- super(lruCache);
- }
-
- /**
- * The method loads the block meta in B-tree lru cache and returns the block meta.
- *
- * @param tableBlockUniqueIdentifier Uniquely identifies the block
- * @return returns the blocks B-Tree meta
- */
- @Override public AbstractIndex get(TableBlockUniqueIdentifier tableBlockUniqueIdentifier)
- throws IOException {
- TableBlockInfo tableBlockInfo = tableBlockUniqueIdentifier.getTableBlockInfo();
- BlockInfo blockInfo = new BlockInfo(tableBlockInfo);
- String lruCacheKey =
- getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo);
- AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey);
-
- // if block is not loaded
- if (null == tableBlock) {
- // check any lock object is present in
- // block info lock map
- Object blockInfoLockObject = blockInfoLock.get(blockInfo);
- // if lock object is not present then acquire
- // the lock in block info lock and add a lock object in the map for
- // particular block info, added double checking mechanism to add the lock
- // object so in case of concurrent query we for same block info only one lock
- // object will be added
- if (null == blockInfoLockObject) {
- synchronized (blockInfoLock) {
- // again checking the block info lock, to check whether lock object is present
- // or not if now also not present then add a lock object
- blockInfoLockObject = blockInfoLock.get(blockInfo);
- if (null == blockInfoLockObject) {
- blockInfoLockObject = new Object();
- blockInfoLock.put(blockInfo, blockInfoLockObject);
- }
- }
- }
- //acquire the lock for particular block info
- synchronized (blockInfoLockObject) {
- // check again whether block is present or not to avoid the
- // same block is loaded
- //more than once in case of concurrent query
- tableBlock = (AbstractIndex) lruCache.get(
- getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
- // if still block is not present then load the block
- if (null == tableBlock) {
- tableBlock = loadBlock(tableBlockUniqueIdentifier);
- fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(),
- blockInfo);
- }
- }
- } else {
- tableBlock.incrementAccessCount();
- }
- return tableBlock;
- }
-
- /**
- * @param absoluteTableIdentifier
- * @param blockInfo
- */
- private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier absoluteTableIdentifier,
- BlockInfo blockInfo) {
- TableSegmentUniqueIdentifier segmentIdentifier =
- new TableSegmentUniqueIdentifier(absoluteTableIdentifier,
- blockInfo.getTableBlockInfo().getSegmentId());
- String uniqueTableSegmentIdentifier = segmentIdentifier.getUniqueTableSegmentIdentifier();
- List<BlockInfo> blockInfos =
- segmentIdToBlockListMap.get(uniqueTableSegmentIdentifier);
- if (null == blockInfos) {
- Object segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier);
- if (null == segmentLockObject) {
- synchronized (segmentIDLock) {
- segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier);
- if (null == segmentLockObject) {
- segmentLockObject = new Object();
- segmentIDLock.put(uniqueTableSegmentIdentifier, segmentLockObject);
- }
- }
- }
- synchronized (segmentLockObject) {
- blockInfos =
- segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier());
- if (null == blockInfos) {
- blockInfos = new CopyOnWriteArrayList<>();
- segmentIdToBlockListMap.put(uniqueTableSegmentIdentifier, blockInfos);
- }
- blockInfos.add(blockInfo);
- }
- } else {
- blockInfos.add(blockInfo);
- }
- }
-
- /**
- * The method takes list of tableblocks as input and load them in btree lru cache
- * and returns the list of data blocks meta
- *
- * @param tableBlocksInfos List of unique table blocks
- * @return List<AbstractIndex>
- * @throws IndexBuilderException
- */
- @Override public List<AbstractIndex> getAll(List<TableBlockUniqueIdentifier> tableBlocksInfos)
- throws IndexBuilderException {
- AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
- int numberOfCores = 1;
- try {
- numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- } catch (NumberFormatException e) {
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
- ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
- List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
- for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : tableBlocksInfos) {
- blocksList.add(executor.submit(new BlockLoaderThread(tableBlockUniqueIdentifier)));
- }
- // shutdown the executor gracefully and wait until all the task is finished
- executor.shutdown();
- try {
- executor.awaitTermination(1, TimeUnit.HOURS);
- } catch (InterruptedException e) {
- throw new IndexBuilderException(e);
- }
- // fill the block which were not loaded before to loaded blocks array
- fillLoadedBlocks(loadedBlock, blocksList);
- return Arrays.asList(loadedBlock);
- }
-
- private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier,
- BlockInfo blockInfo) {
- CarbonTableIdentifier carbonTableIdentifier =
- absoluteTableIdentifier.getCarbonTableIdentifier();
- return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
- + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
- + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + blockInfo
- .getBlockUniqueName();
- }
-
- /**
- * method returns the B-Tree meta
- *
- * @param tableBlockUniqueIdentifier Unique table block info
- * @return
- */
- @Override public AbstractIndex getIfPresent(
- TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
- BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
- BlockIndex cacheable = (BlockIndex) lruCache
- .get(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
- if (null != cacheable) {
- cacheable.incrementAccessCount();
- }
- return cacheable;
- }
-
- /**
- * the method removes the entry from cache.
- *
- * @param tableBlockUniqueIdentifier
- */
- @Override public void invalidate(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
- BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
- lruCache
- .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
- }
-
- @Override public void clearAccessCount(List<TableBlockUniqueIdentifier> keys) {
- for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) {
- SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
- .get(tableBlockUniqueIdentifier.getUniqueTableBlockName());
- cacheable.clear();
- }
- }
-
- /**
- * Below method will be used to fill the loaded blocks to the array
- * which will be used for query execution
- *
- * @param loadedBlockArray array of blocks which will be filled
- * @param blocksList blocks loaded in thread
- * @throws IndexBuilderException in case of any failure
- */
- private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
- List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
- int blockCounter = 0;
- boolean exceptionOccurred = false;
- Throwable exceptionRef = null;
- for (int i = 0; i < loadedBlockArray.length; i++) {
- try {
- loadedBlockArray[i] = blocksList.get(blockCounter++).get();
- } catch (Throwable e) {
- exceptionOccurred = true;
- exceptionRef = e;
- }
- }
- if (exceptionOccurred) {
- LOGGER.error("Block B-tree loading failed. Clearing the access count of the loaded blocks.");
- // in case of any failure clear the access count for the valid loaded blocks
- clearAccessCountForLoadedBlocks(loadedBlockArray);
- throw new IndexBuilderException("Block B-tree loading failed", exceptionRef);
- }
- }
-
- /**
- * This method will clear the access count for the loaded blocks
- *
- * @param loadedBlockArray
- */
- private void clearAccessCountForLoadedBlocks(AbstractIndex[] loadedBlockArray) {
- for (int i = 0; i < loadedBlockArray.length; i++) {
- if (null != loadedBlockArray[i]) {
- loadedBlockArray[i].clear();
- }
- }
- }
-
- /**
- * Thread class which will be used to load the blocks
- */
- private class BlockLoaderThread implements Callable<AbstractIndex> {
- // table block unique identifier
- private TableBlockUniqueIdentifier tableBlockUniqueIdentifier;
-
- private BlockLoaderThread(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
- this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
- }
-
- @Override public AbstractIndex call() throws Exception {
- try {
- //register thread callback for calculating metrics
- TaskMetricsMap.getInstance().registerThreadCallback();
- // load and return the loaded blocks
- return get(tableBlockUniqueIdentifier);
- } finally {
- // update read bytes metrics for this thread
- TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
- }
- }
- }
-
- private AbstractIndex loadBlock(TableBlockUniqueIdentifier tableBlockUniqueIdentifier)
- throws IOException {
- AbstractIndex tableBlock = new BlockIndex();
- BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
- String lruCacheKey =
- getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo);
- checkAndLoadTableBlocks(tableBlock, tableBlockUniqueIdentifier, lruCacheKey);
- // finally remove the lock object from block info lock as once block is loaded
- // it will not come inside this if condition
- blockInfoLock.remove(blockInfo);
- return tableBlock;
- }
-
- /**
- * This will be used to remove a particular blocks useful in case of
- * deletion of some of the blocks in case of retention or may be some other
- * scenario
- *
- * @param segmentIds list of table blocks to be removed
- * @param absoluteTableIdentifier absolute table identifier
- */
- public void removeTableBlocks(List<String> segmentIds,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- if (null == segmentIds) {
- return;
- }
- for (String segmentId : segmentIds) {
- TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
- new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
- List<BlockInfo> blockInfos = segmentIdToBlockListMap
- .remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
- if (null != blockInfos) {
- for (BlockInfo blockInfo : blockInfos) {
- String lruCacheKey = getLruCacheKey(absoluteTableIdentifier, blockInfo);
- lruCache.remove(lruCacheKey);
- }
- }
- }
- }
-
- /**
- * remove TableBlocks executer level If Horizontal Compaction Done
- * @param queryModel
- */
- public void removeTableBlocksIfHorizontalCompactionDone(QueryModel queryModel) {
- // get the invalid segments blocks details
- Map<String, UpdateVO> invalidBlocksVO = queryModel.getInvalidBlockVOForSegmentId();
- if (!invalidBlocksVO.isEmpty()) {
- UpdateVO updateMetadata;
- Iterator<Map.Entry<String, UpdateVO>> itr = invalidBlocksVO.entrySet().iterator();
- String blockTimestamp = null;
- while (itr.hasNext()) {
- Map.Entry<String, UpdateVO> entry = itr.next();
- TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
- new TableSegmentUniqueIdentifier(queryModel.getAbsoluteTableIdentifier(),
- entry.getKey());
- List<BlockInfo> blockInfos = segmentIdToBlockListMap
- .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
- if (null != blockInfos) {
- for (BlockInfo blockInfo : blockInfos) {
- // reading the updated block names from status manager instance
- blockTimestamp = blockInfo.getBlockUniqueName()
- .substring(blockInfo.getBlockUniqueName().lastIndexOf('-') + 1,
- blockInfo.getBlockUniqueName().length());
- updateMetadata = entry.getValue();
- if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(Long.parseLong(blockTimestamp))) {
- Long blockTimeStamp = Long.parseLong(blockTimestamp);
- if (blockTimeStamp > updateMetadata.getFactTimestamp() && (
- updateMetadata.getUpdateDeltaStartTimestamp() != null
- && blockTimeStamp < updateMetadata.getUpdateDeltaStartTimestamp())) {
- String lruCacheKey =
- getLruCacheKey(queryModel.getAbsoluteTableIdentifier(), blockInfo);
- lruCache.remove(lruCacheKey);
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
deleted file mode 100644
index 7e24dba..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.block;
-
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
-import org.apache.carbondata.core.datastore.BtreeBuilder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockletBTreeBuilder;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-
-/**
- * Class which is responsible for loading the b+ tree block. This class will
- * persist all the detail of a table block
- */
-public class BlockIndex extends AbstractIndex {
-
- /**
- * Below method will be used to load the data block
- *
- */
- public void buildIndex(List<DataFileFooter> footerList) {
- // create a metadata details
- // this will be useful in query handling
- segmentProperties = new SegmentProperties(footerList.get(0).getColumnInTable(),
- footerList.get(0).getSegmentInfo().getColumnCardinality());
- // create a segment builder info
- BTreeBuilderInfo indexBuilderInfo =
- new BTreeBuilderInfo(footerList, segmentProperties.getDimensionColumnsValueSize());
- BtreeBuilder blocksBuilder = new BlockletBTreeBuilder();
- // load the metadata
- blocksBuilder.build(indexBuilderInfo);
- dataRefNode = blocksBuilder.get();
- totalNumberOfRows = footerList.get(0).getNumberOfRows();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeBuilder.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeBuilder.java
deleted file mode 100644
index f97a469..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeBuilder.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.impl.btree;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
-import org.apache.carbondata.core.datastore.IndexKey;
-
-/**
- * Btree based builder which will build the leaf node in a b+ tree format
- */
-public class BlockletBTreeBuilder extends AbstractBTreeBuilder {
-
- /**
- * Attribute for Carbon LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockletBTreeBuilder.class.getName());
-
- /**
- * Below method will be used to build the segment info bplus tree format
- * Tree will be a read only tree, and it will be build on Bottoms up approach
- * first all the leaf node will be built and then intermediate node
- * in our case one leaf node will have not only one entry it will have group of entries
- */
- @Override public void build(BTreeBuilderInfo segmentBuilderInfos) {
- long totalNumberOfTuple = 0;
- int groupCounter;
- int nInternal = 0;
- BTreeNode curNode = null;
- BTreeNode prevNode = null;
- List<BTreeNode[]> nodeGroups =
- new ArrayList<BTreeNode[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- BTreeNode[] currentGroup = null;
- List<List<IndexKey>> interNSKeyList =
- new ArrayList<List<IndexKey>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<IndexKey> leafNSKeyList = null;
- long nodeNumber = 0;
- for (int index = 0;
- index < segmentBuilderInfos.getFooterList().get(0).getBlockletList()
- .size(); index++) {
- // creating a leaf node
- curNode = new BlockletBTreeLeafNode(segmentBuilderInfos, index, nodeNumber++);
- totalNumberOfTuple +=
- segmentBuilderInfos.getFooterList().get(0).getBlockletList().get(index)
- .getNumberOfRows();
- nLeaf++;
- // setting a next node as its a b+tree
- // so all the leaf node will be chained
- // will be stored in linked list
- if (prevNode != null) {
- prevNode.setNextNode(curNode);
- }
- prevNode = curNode;
- // as intermediate node will have more than one leaf
- // in cerating a group
- groupCounter = (nLeaf - 1) % (maxNumberOfEntriesInNonLeafNodes);
- if (groupCounter == 0) {
- // Create new node group if current group is full
- leafNSKeyList = new ArrayList<IndexKey>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- currentGroup = new BTreeNode[maxNumberOfEntriesInNonLeafNodes];
- nodeGroups.add(currentGroup);
- nInternal++;
- interNSKeyList.add(leafNSKeyList);
- }
- if (null != leafNSKeyList) {
- leafNSKeyList.add(convertStartKeyToNodeEntry(
- segmentBuilderInfos.getFooterList().get(0).getBlockletList().get(index)
- .getBlockletIndex().getBtreeIndex().getStartKey()));
- }
- if (null != currentGroup) {
- currentGroup[groupCounter] = curNode;
- }
- }
- if (totalNumberOfTuple == 0) {
- return;
- }
- // adding a intermediate node
- addIntermediateNode(curNode, nodeGroups, currentGroup, interNSKeyList, nInternal);
- LOGGER.info("****************************Total Number Rows In BTREE: " + totalNumberOfTuple);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
deleted file mode 100644
index ddd7fcf..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.impl.btree;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
-import org.apache.carbondata.core.datastore.FileReader;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory;
-import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
-import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-
-/**
- * Leaf node class of a Blocklet btree
- */
-public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
-
- /**
- * reader for dimension chunk
- */
- private DimensionColumnChunkReader dimensionChunksReader;
-
- /**
- * reader of measure chunk
- */
- private MeasureColumnChunkReader measureColumnChunkReader;
-
- /**
- * reader for dimension chunk of page level
- */
- private DimensionColumnChunkReader dimensionChunksPageLevelReader;
-
- /**
- * reader of measure chunk of page level
- */
- private MeasureColumnChunkReader measureColumnChunkPageLevelReader;
-
- /**
- * number of pages in blocklet
- */
- private int numberOfPages;
-
- private int[] pageRowCount;
-
- /**
- * Create a leaf node
- *
- * @param builderInfos builder infos which have required metadata to create a leaf node
- * @param leafIndex leaf node index
- * @param nodeNumber node number of the node
- * this will be used during query execution when we can
- * give some leaf node of a btree to one executor some to other
- */
- BlockletBTreeLeafNode(BTreeBuilderInfo builderInfos, int leafIndex, long nodeNumber) {
- // get a lead node min max
- BlockletMinMaxIndex minMaxIndex =
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getBlockletIndex()
- .getMinMaxIndex();
- // max key of the columns
- maxKeyOfColumns = minMaxIndex.getMaxValues();
- // min keys of the columns
- minKeyOfColumns = minMaxIndex.getMinValues();
- // number of keys present in the leaf
- numberOfKeys =
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getNumberOfRows();
- // create a instance of dimension chunk
- dimensionChunksReader = CarbonDataReaderFactory.getInstance()
- .getDimensionColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
- builderInfos.getDimensionColumnValueSize(),
- builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath(),
- false);
- // create a instance of measure column chunk reader
- measureColumnChunkReader = CarbonDataReaderFactory.getInstance()
- .getMeasureColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
- builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath(),
- false);
- // create a instance of dimension chunk
- dimensionChunksPageLevelReader = CarbonDataReaderFactory.getInstance()
- .getDimensionColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
- builderInfos.getDimensionColumnValueSize(),
- builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath(),
- true);
- // create a instance of measure column chunk reader
- measureColumnChunkPageLevelReader = CarbonDataReaderFactory.getInstance()
- .getMeasureColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
- builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath(),
- true);
-
- this.nodeNumber = nodeNumber;
- this.numberOfPages =
- builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getNumberOfPages();
- this.pageRowCount = new int[numberOfPages];
- int numberOfPagesCompletelyFilled =
- numberOfKeys / CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
- int lastPageRowCount =
- numberOfKeys % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
- for (int i = 0; i < numberOfPagesCompletelyFilled; i++) {
- pageRowCount[i] = CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
- }
- if (lastPageRowCount > 0) {
- pageRowCount[pageRowCount.length - 1] = lastPageRowCount;
- }
- }
-
- @Override public short blockletIndex() {
- return 0;
- }
-
- /**
- * Below method will be used to get the dimension chunks
- *
- * @param fileReader file reader to read the chunks from file
- * @param columnIndexRange indexes of the blocks need to be read
- * @return dimension data chunks
- */
- @Override public DimensionRawColumnChunk[] readDimensionChunks(FileReader fileReader,
- int[][] columnIndexRange) throws IOException {
- if (fileReader.isReadPageByPage()) {
- return dimensionChunksPageLevelReader.readRawDimensionChunks(fileReader, columnIndexRange);
- } else {
- return dimensionChunksReader.readRawDimensionChunks(fileReader, columnIndexRange);
- }
- }
-
- /**
- * Below method will be used to get the dimension chunk
- *
- * @param fileReader file reader to read the chunk from file
- * @param columnIndex block index to be read
- * @return dimension data chunk
- */
- @Override public DimensionRawColumnChunk readDimensionChunk(
- FileReader fileReader, int columnIndex) throws IOException {
- if (fileReader.isReadPageByPage()) {
- return dimensionChunksPageLevelReader.readRawDimensionChunk(fileReader, columnIndex);
- } else {
- return dimensionChunksReader.readRawDimensionChunk(fileReader, columnIndex);
- }
- }
-
- /**
- * Below method will be used to get the measure chunk
- *
- * @param fileReader file reader to read the chunk from file
- * @param columnIndexRange block indexes to be read from file
- * @return measure column data chunk
- */
- @Override public MeasureRawColumnChunk[] readMeasureChunks(FileReader fileReader,
- int[][] columnIndexRange) throws IOException {
- if (fileReader.isReadPageByPage()) {
- return measureColumnChunkPageLevelReader.readRawMeasureChunks(fileReader, columnIndexRange);
- } else {
- return measureColumnChunkReader.readRawMeasureChunks(fileReader, columnIndexRange);
- }
- }
-
- /**
- * Below method will be used to read the measure chunk
- *
- * @param fileReader file read to read the file chunk
- * @param columnIndex block index to be read from file
- * @return measure data chunk
- */
- @Override public MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex)
- throws IOException {
- if (fileReader.isReadPageByPage()) {
- return measureColumnChunkPageLevelReader.readRawMeasureChunk(fileReader, columnIndex);
- } else {
- return measureColumnChunkReader.readRawMeasureChunk(fileReader, columnIndex);
- }
- }
-
- /**
- * @return the number of pages in blocklet
- */
- @Override public int numberOfPages() {
- return numberOfPages;
- }
-
- @Override public int getPageRowCount(int pageNumber) {
- return this.pageRowCount[pageNumber];
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 08c5cc7..bc40be8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -32,11 +32,8 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.common.logging.impl.StandardLogService;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -128,42 +125,27 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// query execution
Collections.sort(queryModel.getTableBlockInfos());
- if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
- List<AbstractIndex> indexList = new ArrayList<>();
- Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
- for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
- List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
- if (tableBlockInfos == null) {
- tableBlockInfos = new ArrayList<>();
- listMap.put(blockInfo.getFilePath(), tableBlockInfos);
- }
- BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
- // This is the case of old stores where blocklet information is not available so read
- // the blocklet information from block file
- if (blockletDetailInfo.getBlockletInfo() == null) {
- readAndFillBlockletInfo(blockInfo, tableBlockInfos, blockletDetailInfo);
- } else {
- tableBlockInfos.add(blockInfo);
- }
+ List<AbstractIndex> indexList = new ArrayList<>();
+ Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+ for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+ List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+ if (tableBlockInfos == null) {
+ tableBlockInfos = new ArrayList<>();
+ listMap.put(blockInfo.getFilePath(), tableBlockInfos);
}
- for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
- indexList.add(new IndexWrapper(tableBlockInfos));
+ BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
+ // This is the case of old stores where blocklet information is not available so read
+ // the blocklet information from block file
+ if (blockletDetailInfo.getBlockletInfo() == null) {
+ readAndFillBlockletInfo(blockInfo, tableBlockInfos, blockletDetailInfo);
+ } else {
+ tableBlockInfos.add(blockInfo);
}
- queryProperties.dataBlocks = indexList;
- } else {
- // get the table blocks
- CacheProvider cacheProvider = CacheProvider.getInstance();
- BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
- (BlockIndexStore) cacheProvider.createCache(CacheType.EXECUTOR_BTREE);
- // remove the invalid table blocks, block which is deleted or compacted
- cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
- queryModel.getAbsoluteTableIdentifier());
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
- prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
- queryModel.getAbsoluteTableIdentifier());
- cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
- queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
}
+ for (List<TableBlockInfo> tableBlockInfos : listMap.values()) {
+ indexList.add(new IndexWrapper(tableBlockInfos));
+ }
+ queryProperties.dataBlocks = indexList;
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 0ccc990..6adfb3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -17,18 +17,12 @@
package org.apache.carbondata.core.scan.filter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.DataRefNode;
-import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -93,84 +87,6 @@ public class FilterExpressionProcessor implements FilterProcessor {
}
/**
- * This API will scan the Segment level all btrees and selects the required
- * block reference nodes inorder to push the same to executer for applying filters
- * on the respective data reference node.
- * Following Algorithm is followed in below API
- * Step:1 Get the start end key based on the filter tree resolver information
- * Step:2 Prepare the IndexKeys inorder to scan the tree and get the start and end reference
- * node(block)
- * Step:3 Once data reference node ranges retrieved traverse the node within this range
- * and select the node based on the block min and max value and the filter value.
- * Step:4 The selected blocks will be send to executers for applying the filters with the help
- * of Filter executers.
- *
- */
- public List<DataRefNode> getFilterredBlocks(DataRefNode btreeNode,
- FilterResolverIntf filterResolver, AbstractIndex tableSegment) {
- // Need to get the current dimension tables
- List<DataRefNode> listOfDataBlocksToScan = new ArrayList<DataRefNode>();
- // getting the start and end index key based on filter for hitting the
- // selected block reference nodes based on filter resolver tree.
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("preparing the start and end key for finding"
- + "start and end block as per filter resolver");
- }
- IndexKey searchStartKey = null;
- IndexKey searchEndKey = null;
- try {
- searchStartKey = FilterUtil.prepareDefaultStartIndexKey(tableSegment.getSegmentProperties());
- searchEndKey = FilterUtil.prepareDefaultEndIndexKey(tableSegment.getSegmentProperties());
- } catch (KeyGenException e) {
- throw new RuntimeException(e);
- }
- if (LOGGER.isDebugEnabled()) {
- char delimiter = ',';
- LOGGER.debug(
- "Successfully retrieved the start and end key" + "Dictionary Start Key: " + joinByteArray(
- searchStartKey.getDictionaryKeys(), delimiter) + "No Dictionary Start Key "
- + joinByteArray(searchStartKey.getNoDictionaryKeys(), delimiter)
- + "Dictionary End Key: " + joinByteArray(searchEndKey.getDictionaryKeys(), delimiter)
- + "No Dictionary End Key " + joinByteArray(searchEndKey.getNoDictionaryKeys(),
- delimiter));
- }
- long startTimeInMillis = System.currentTimeMillis();
- DataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(
- tableSegment.getSegmentProperties().getEachDimColumnValueSize(),
- tableSegment.getSegmentProperties().getNumberOfSortColumns(),
- tableSegment.getSegmentProperties().getNumberOfNoDictSortColumns());
- DataRefNode startBlock = blockFinder.findFirstDataBlock(btreeNode, searchStartKey);
- DataRefNode endBlock = blockFinder.findLastDataBlock(btreeNode, searchEndKey);
- FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null);
- while (startBlock != endBlock) {
- addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock);
- startBlock = startBlock.getNextDataRefNode();
- }
- addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock);
- LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + (
- System.currentTimeMillis() - startTimeInMillis)
- + " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan
- .size());
-
- return listOfDataBlocksToScan;
- }
-
- private String joinByteArray(byte[] bytes, char delimiter) {
- StringBuffer byteArrayAsString = new StringBuffer();
- byteArrayAsString.append("");
- if (null != bytes) {
- for (int i = 0; i < bytes.length; i++) {
- byteArrayAsString.append(delimiter).append(bytes[i]);
- }
- if (byteArrayAsString.length() > 0) {
- return byteArrayAsString.substring(1);
- }
- }
- return null;
- }
-
- /**
* Get the map of required partitions
* The value of "1" in BitSet represent the required partition
* @param expressionTree
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
index dbb6eb7..1ae795e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
@@ -19,10 +19,7 @@ package org.apache.carbondata.core.scan.filter;
import java.io.IOException;
import java.util.BitSet;
-import java.util.List;
-import org.apache.carbondata.core.datastore.DataRefNode;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -45,17 +42,6 @@ public interface FilterProcessor {
throws FilterUnsupportedException, IOException;
/**
- * This API is exposed inorder to get the required block reference node
- * based on the filter.The block list will be send to the executer tasks inorder
- * to apply filters.
- *
- * @param filterResolver DataBlock list with resolved filters
- * @return list of DataRefNode.
- */
- List<DataRefNode> getFilterredBlocks(DataRefNode dataRefNode, FilterResolverIntf filterResolver,
- AbstractIndex segmentIndexBuilder);
-
- /**
* This API will get the map of required partitions.
* @return BitSet the value "1" represent the required partition.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java
index 0617af3..7c48f37 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/CacheProviderTest.java
@@ -25,10 +25,8 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache;
import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
-import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.util.CarbonProperties;
import org.junit.Before;
@@ -101,19 +99,5 @@ public class CacheProviderTest {
assertEquals(1024 * 1024 * Integer.parseInt(driverCacheSize), lruCacheMemorySize);
// drop cache
cacheProvider.dropAllCache();
- // validation test for the executor memory.
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false");
- Cache<TableBlockUniqueIdentifier, BlockIndexStore> executorCache =
- cacheProvider.createCache(CacheType.EXECUTOR_BTREE);
- carbonLRUCacheField = BlockIndexStore.class.getSuperclass().getDeclaredField("lruCache");
- carbonLRUCacheField.setAccessible(true);
- carbonLRUCache = (CarbonLRUCache) carbonLRUCacheField.get(executorCache);
- lruCacheMemorySizeField = CarbonLRUCache.class.getDeclaredField("lruCacheMemorySize");
- lruCacheMemorySizeField.setAccessible(true);
- lruCacheMemorySize = (long) lruCacheMemorySizeField.get(carbonLRUCache);
- String executorCacheSize = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE);
- assertEquals(1024 * 1024 * Integer.parseInt(executorCacheSize), lruCacheMemorySize);
- cacheProvider.dropAllCache();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockIndexTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockIndexTest.java
deleted file mode 100644
index f252a70..0000000
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockIndexTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.block;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
-import org.apache.carbondata.core.datastore.impl.btree.BlockletBTreeBuilder;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-
-import mockit.Mock;
-import mockit.MockUp;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static junit.framework.TestCase.assertEquals;
-
-public class BlockIndexTest {
-
- private static SegmentInfo segmentInfo;
- private static DataFileFooter footer;
- private static ColumnSchema columnSchema;
- private static BlockletInfo blockletInfo;
- private static BlockletIndex blockletIndex;
- private static List<DataFileFooter> footerList = new ArrayList<DataFileFooter>();
- private static List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-
- @BeforeClass public static void setUp() {
- segmentInfo = new SegmentInfo();
- footer = new DataFileFooter();
- columnSchema = new ColumnSchema();
- blockletInfo = new BlockletInfo();
- blockletIndex = new BlockletIndex();
- }
-
- @Test public void testBuild() {
- segmentInfo = new SegmentInfo();
- new MockUp<BlockletBTreeBuilder>() {
- @Mock public void build(BTreeBuilderInfo segmentBuilderInfos) {
- }
- };
- int expectedValue = 0;
- BlockIndex blockIndex = new BlockIndex();
- columnSchema.setColumnName("employeeName");
- columnSchemaList.add(new ColumnSchema());
-
- footer.setSegmentInfo(segmentInfo);
- footer.setColumnInTable(columnSchemaList);
- footer.setBlockletList(Arrays.asList(blockletInfo));
- footerList.add(footer);
-
- blockIndex.buildIndex(footerList);
- assertEquals(footerList.get(0).getNumberOfRows(), expectedValue);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Index.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Index.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Index.java
deleted file mode 100644
index bd0b809..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Index.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.index;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * An index is associate with one segment, it is used for filtering on files in the segment.
- */
-public interface Index {
-
- /**
- * Index name
- * @return index name
- */
- String getName();
-
- /**
- * Used to filter blocks based on filter
- * @param job job
- * @param filter filter
- * @return filtered block
- * @throws IOException
- */
- List<Block> filter(JobContext job, FilterResolverIntf filter) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/IndexLoader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/IndexLoader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/IndexLoader.java
deleted file mode 100644
index ac8a848..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/IndexLoader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.index;
-
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-
-/**
- * Used to load the index
- */
-public interface IndexLoader {
-
- /**
- * load the index based on the index information set in the configuration
- * @param segment the index from which segment to load
- * @return the loaded index
- */
- Index load(Segment segment);
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
deleted file mode 100644
index 0fe0cbf..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.index.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.DataRefNode;
-import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
-import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.BlockletInfos;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.hadoop.CacheClient;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.index.Block;
-import org.apache.carbondata.hadoop.internal.index.Index;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-class InMemoryBTreeIndex implements Index {
-
- private static final Log LOG = LogFactory.getLog(InMemoryBTreeIndex.class);
- private Segment segment;
-
- InMemoryBTreeIndex(Segment segment) {
- this.segment = segment;
- }
-
- @Override
- public String getName() {
- return null;
- }
-
- @Override
- public List<Block> filter(JobContext job, FilterResolverIntf filter)
- throws IOException {
-
- List<Block> result = new LinkedList<>();
-
- FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-
- AbsoluteTableIdentifier identifier = AbsoluteTableIdentifier.from(segment.getPath(), "", "");
-
- //for this segment fetch blocks matching filter in BTree
- List<DataRefNode> dataRefNodes =
- getDataBlocksOfSegment(job, filterExpressionProcessor, identifier, filter);
- for (DataRefNode dataRefNode : dataRefNodes) {
- BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
- TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
- result.add(new CarbonInputSplit(segment.getId(),
- tableBlockInfo.getDetailInfo().getBlockletId().toString(),
- new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(),
- tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(),
- tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), tableBlockInfo.getVersion(), null));
- }
- return result;
- }
-
- private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
- JobContext job, AbsoluteTableIdentifier identifier) throws IOException {
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
- CacheClient cacheClient = new CacheClient();
- TableSegmentUniqueIdentifier segmentUniqueIdentifier =
- new TableSegmentUniqueIdentifier(identifier, segment.getId());
- try {
- SegmentTaskIndexWrapper segmentTaskIndexWrapper =
- cacheClient.getSegmentAccessClient().getIfPresent(segmentUniqueIdentifier);
- if (null != segmentTaskIndexWrapper) {
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
- }
- // if segment tree is not loaded, load the segment tree
- if (segmentIndexMap == null) {
- List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job);
- Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
- segmentToTableBlocksInfos.put(segment.getId(), tableBlockInfoList);
- segmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
- // TODO: loadAndGetTaskIdToSegmentsMap can be optimized, use tableBlockInfoList as input
- // get Btree blocks for given segment
- segmentTaskIndexWrapper = cacheClient.getSegmentAccessClient().get(segmentUniqueIdentifier);
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
- }
- } finally {
- cacheClient.close();
- }
- return segmentIndexMap;
- }
-
- /**
- * Below method will be used to get the table block info
- *
- * @param job job context
- * @return list of table block
- * @throws IOException
- */
- private List<TableBlockInfo> getTableBlockInfo(JobContext job) throws IOException {
- List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
-
- // identify table blocks from all file locations of given segment
- for (InputSplit inputSplit : segment.getAllSplits(job)) {
- CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
- BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
- carbonInputSplit.getNumberOfBlocklets());
- tableBlockInfoList.add(
- new TableBlockInfo(carbonInputSplit.getPath().toString(),
- carbonInputSplit.getBlockletId(),carbonInputSplit.getStart(), segment.getId(),
- carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
- blockletInfos, carbonInputSplit.getVersion(),
- carbonInputSplit.getDeleteDeltaFiles()));
- }
- return tableBlockInfoList;
- }
-
- /**
- * get data blocks of given segment
- */
- private List<DataRefNode> getDataBlocksOfSegment(JobContext job,
- FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier identifier,
- FilterResolverIntf resolver) throws IOException {
-
- QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
- QueryStatistic statistic = new QueryStatistic();
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
- getSegmentAbstractIndexs(job, identifier);
-
- List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
- // build result
- for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-
- List<DataRefNode> filterredBlocks = null;
- // if no filter is given get all blocks from Btree Index
- if (null == resolver) {
- filterredBlocks = getDataBlocksOfIndex(abstractIndex);
- } else {
- // apply filter and get matching blocks
- filterredBlocks = filterExpressionProcessor.getFilterredBlocks(
- abstractIndex.getDataRefNode(),
- resolver,
- abstractIndex);
- }
- resultFilterredBlocks.addAll(filterredBlocks);
- }
- statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER,
- System.currentTimeMillis());
- recorder.recordStatistics(statistic);
- recorder.logStatistics();
- return resultFilterredBlocks;
- }
-
- /**
- * get data blocks of given btree
- */
- private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
- List<DataRefNode> blocks = new LinkedList<DataRefNode>();
- SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
-
- try {
- IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
- IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
- // Add all blocks of btree into result
- DataRefNodeFinder blockFinder =
- new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize(),
- segmentProperties.getNumberOfSortColumns(),
- segmentProperties.getNumberOfNoDictSortColumns());
- DataRefNode startBlock =
- blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
- DataRefNode endBlock =
- blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
- while (startBlock != endBlock) {
- blocks.add(startBlock);
- startBlock = startBlock.getNextDataRefNode();
- }
- blocks.add(endBlock);
-
- } catch (KeyGenException e) {
- LOG.error("Could not generate start key", e);
- }
- return blocks;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndexLoader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndexLoader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndexLoader.java
deleted file mode 100644
index 2369dec..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndexLoader.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.index.impl;
-
-import org.apache.carbondata.hadoop.internal.index.Index;
-import org.apache.carbondata.hadoop.internal.index.IndexLoader;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-
-public class InMemoryBTreeIndexLoader implements IndexLoader {
- @Override
- public Index load(Segment segment) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManager.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManager.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManager.java
deleted file mode 100644
index 420a8f2..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.segment;
-
-import java.io.IOException;
-
-/**
- * Segment manager is used to manage all segment within one table.
- * It manages segment state internally in a transactional manner.
- */
-public interface SegmentManager {
-
- /**
- * Used for getting all segments for scan
- */
- Segment[] getAllValidSegments();
-
- /**
- * Used for data load
- * caller should open new segment by calling this, after data success, call commitSegment,
- * call closeSegment if any failure
- */
- Segment openNewSegment() throws IOException;
-
- /**
- * Call this function when data load or compaction is completed successfully.
- */
- void commitSegment(Segment segment) throws IOException;
-
- /**
- * Call this function when data load or compaction failed.
- */
- void closeSegment(Segment segment) throws IOException;
-
- /**
- * Delete this segment
- */
- void deleteSegment(Segment segment) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManagerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManagerFactory.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManagerFactory.java
deleted file mode 100644
index 2ca7bb2..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/SegmentManagerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.segment;
-
-/**
- * Used to get the global segment manager instance
- */
-public class SegmentManagerFactory {
- public static SegmentManager getGlobalSegmentManager() {
- //TODO return the default implementation
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/IndexedSegment.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/IndexedSegment.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/IndexedSegment.java
deleted file mode 100644
index 024a504..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/IndexedSegment.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.segment.impl;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.hadoop.internal.index.Block;
-import org.apache.carbondata.hadoop.internal.index.Index;
-import org.apache.carbondata.hadoop.internal.index.IndexLoader;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * This segment is backed by index, thus getSplits can use the index to do file pruning.
- */
-public class IndexedSegment extends Segment {
-
- private IndexLoader loader;
-
- public IndexedSegment(String name, String path, IndexLoader loader) {
- super(name, path);
- this.loader = loader;
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver)
- throws IOException {
- // do as following
- // 1. create the index or get from cache by the filter name in the configuration
- // 2. filter by index to get the filtered block
- // 3. create input split from filtered block
-
- List<InputSplit> output = new LinkedList<>();
- Index index = loader.load(this);
- List<Block> blocks = index.filter(job, filterResolver);
- for (Block block: blocks) {
- output.add(makeInputSplit(block));
- }
- return output;
- }
-
- @Override
- public void setupForRead(JobContext job) throws IOException {
-
- }
-
- private InputSplit makeInputSplit(Block block) {
- // TODO: get all required parameter from block
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/61788353/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/zk/ZkSegmentManager.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/zk/ZkSegmentManager.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/zk/ZkSegmentManager.java
deleted file mode 100644
index 12898e7..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/segment/impl/zk/ZkSegmentManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.internal.segment.impl.zk;
-
-import java.io.IOException;
-
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-
-/**
- * This class leverage Zookeeper and HDFS file to manage segments.
- * Zookeeper is used for locking, and HDFS is for storing the state of segments.
- */
-public class ZkSegmentManager implements SegmentManager {
- @Override
- public Segment[] getAllValidSegments() {
- return new Segment[0];
- }
-
- @Override
- public Segment openNewSegment() throws IOException {
- return null;
- }
-
- @Override
- public void commitSegment(Segment segment) throws IOException {
-
- }
-
- @Override
- public void closeSegment(Segment segment) throws IOException {
-
- }
-
- @Override
- public void deleteSegment(Segment segment) throws IOException {
-
- }
-}