You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:32 UTC

[47/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
new file mode 100644
index 0000000..be48ce5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.BlockIndex;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+/**
+ * Singleton Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class BlockIndexStore {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockIndexStore.class.getName());
+  /**
+   * singleton instance
+   */
+  private static final BlockIndexStore CARBONTABLEBLOCKSINSTANCE = new BlockIndexStore();
+
+  /**
+   * map to hold the table and its list of blocks
+   */
+  private Map<AbsoluteTableIdentifier, Map<TableBlockInfo, AbstractIndex>> tableBlocksMap;
+
+  /**
+   * map of block info to lock object map, while loading the btree this will be filled
+   * and removed after loading the tree for that particular block info, this will be useful
+   * while loading the tree concurrently so only block level lock will be applied another
+   * block can be loaded concurrently
+   */
+  private Map<TableBlockInfo, Object> blockInfoLock;
+
+  /**
+   * table and its lock object to this will be useful in case of concurrent
+   * query scenario when more than one query comes for same table and in that
+   * case it will ensure that only one query will able to load the blocks
+   */
+  private Map<AbsoluteTableIdentifier, Object> tableLockMap;
+
+  private BlockIndexStore() {
+    tableBlocksMap =
+        new ConcurrentHashMap<AbsoluteTableIdentifier, Map<TableBlockInfo, AbstractIndex>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
+        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    blockInfoLock = new ConcurrentHashMap<TableBlockInfo, Object>();
+  }
+
+  /**
+   * Return the instance of this class
+   *
+   * @return singleton instance
+   */
+  public static BlockIndexStore getInstance() {
+    return CARBONTABLEBLOCKSINSTANCE;
+  }
+
+  /**
+   * below method will be used to load the block which are not loaded and to
+   * get the loaded blocks if all the blocks which are passed is loaded then
+   * it will not load , else it will load.
+   *
+   * @param tableBlocksInfos        list of blocks to be loaded
+   * @param absoluteTableIdentifier absolute Table Identifier to identify the table
+   * @throws IndexBuilderException
+   */
+  public List<AbstractIndex> loadAndGetBlocks(List<TableBlockInfo> tableBlocksInfos,
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
+    AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
+    addTableLockObject(absoluteTableIdentifier);
+    // sort the block info
+    // so block will be loaded in sorted order this will be required for
+    // query execution
+    Collections.sort(tableBlocksInfos);
+    // get the instance
+    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
+    Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp = null;
+    int numberOfCores = 1;
+    try {
+      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+    } catch (NumberFormatException e) {
+      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
+    // Acquire the lock to ensure only one query is loading the table blocks
+    // if same block is assigned to both the queries
+    synchronized (lockObject) {
+      tableBlockMapTemp = tableBlocksMap.get(absoluteTableIdentifier);
+      // if it is loading for first time
+      if (null == tableBlockMapTemp) {
+        tableBlockMapTemp = new ConcurrentHashMap<TableBlockInfo, AbstractIndex>();
+        tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp);
+      }
+    }
+    AbstractIndex tableBlock = null;
+    List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
+    int counter = -1;
+    for (TableBlockInfo blockInfo : tableBlocksInfos) {
+      counter++;
+      // if table block is already loaded then do not load
+      // that block
+      tableBlock = tableBlockMapTemp.get(blockInfo);
+      // if block is not loaded
+      if (null == tableBlock) {
+        // check any lock object is present in
+        // block info lock map
+        Object blockInfoLockObject = blockInfoLock.get(blockInfo);
+        // if lock object is not present then acquire
+        // the lock in block info lock and add a lock object in the map for
+        // particular block info, added double checking mechanism to add the lock
+        // object so in case of concurrent query we for same block info only one lock
+        // object will be added
+        if (null == blockInfoLockObject) {
+          synchronized (blockInfoLock) {
+            // again checking the block info lock, to check whether lock object is present
+            // or not if now also not present then add a lock object
+            blockInfoLockObject = blockInfoLock.get(blockInfo);
+            if (null == blockInfoLockObject) {
+              blockInfoLockObject = new Object();
+              blockInfoLock.put(blockInfo, blockInfoLockObject);
+            }
+          }
+        }
+        //acquire the lock for particular block info
+        synchronized (blockInfoLockObject) {
+          // check again whether block is present or not to avoid the
+          // same block is loaded
+          //more than once in case of concurrent query
+          tableBlock = tableBlockMapTemp.get(blockInfo);
+          // if still block is not present then load the block
+          if (null == tableBlock) {
+            blocksList.add(executor.submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
+          }
+        }
+      } else {
+        // if blocks is already loaded then directly set the block at particular position
+        //so block will be present in sorted order
+        loadedBlock[counter] = tableBlock;
+      }
+    }
+    // shutdown the executor gracefully and wait until all the task is finished
+    executor.shutdown();
+    try {
+      executor.awaitTermination(1, TimeUnit.HOURS);
+    } catch (InterruptedException e) {
+      throw new IndexBuilderException(e);
+    }
+    // fill the block which were not loaded before to loaded blocks array
+    fillLoadedBlocks(loadedBlock, blocksList);
+    return Arrays.asList(loadedBlock);
+  }
+
+  /**
+   * Below method will be used to fill the loaded blocks to the array
+   * which will be used for query execution
+   *
+   * @param loadedBlockArray array of blocks which will be filled
+   * @param blocksList       blocks loaded in thread
+   * @throws IndexBuilderException in case of any failure
+   */
+  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
+      List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
+    int blockCounter = 0;
+    for (int i = 0; i < loadedBlockArray.length; i++) {
+      if (null == loadedBlockArray[i]) {
+        try {
+          loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IndexBuilderException(e);
+        }
+      }
+
+    }
+  }
+
+  private AbstractIndex loadBlock(Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp,
+      TableBlockInfo blockInfo) throws CarbonUtilException {
+    AbstractIndex tableBlock;
+    DataFileFooter footer;
+    // getting the data file meta data of the block
+    footer = CarbonUtil.readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+        blockInfo.getBlockLength());
+    tableBlock = new BlockIndex();
+    footer.setTableBlockInfo(blockInfo);
+    // building the block
+    tableBlock.buildIndex(Arrays.asList(footer));
+    tableBlockMapTemp.put(blockInfo, tableBlock);
+    // finally remove the lock object from block info lock as once block is loaded
+    // it will not come inside this if condition
+    blockInfoLock.remove(blockInfo);
+    return tableBlock;
+  }
+
+  /**
+   * Method to add table level lock if lock is not present for the table
+   *
+   * @param absoluteTableIdentifier
+   */
+  private synchronized void addTableLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // add the instance to lock map if it is not present
+    if (null == tableLockMap.get(absoluteTableIdentifier)) {
+      tableLockMap.put(absoluteTableIdentifier, new Object());
+    }
+  }
+
+  /**
+   * This will be used to remove a particular blocks useful in case of
+   * deletion of some of the blocks in case of retention or may be some other
+   * scenario
+   *
+   * @param removeTableBlocksInfos  blocks to be removed
+   * @param absoluteTableIdentifier absolute table identifier
+   */
+  public void removeTableBlocks(List<TableBlockInfo> removeTableBlocksInfos,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // get the lock object if lock object is not present then it is not
+    // loaded at all
+    // we can return from here
+    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
+    if (null == lockObject) {
+      return;
+    }
+    Map<TableBlockInfo, AbstractIndex> map = tableBlocksMap.get(absoluteTableIdentifier);
+    // if there is no loaded blocks then return
+    if (null == map) {
+      return;
+    }
+    for (TableBlockInfo blockInfos : removeTableBlocksInfos) {
+      map.remove(blockInfos);
+    }
+  }
+
+  /**
+   * remove all the details of a table this will be used in case of drop table
+   *
+   * @param absoluteTableIdentifier absolute table identifier to find the table
+   */
+  public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // removing all the details of table
+    tableLockMap.remove(absoluteTableIdentifier);
+    tableBlocksMap.remove(absoluteTableIdentifier);
+  }
+
+  /**
+   * Thread class which will be used to load the blocks
+   */
+  private class BlockLoaderThread implements Callable<AbstractIndex> {
+    /**
+     * table block info to block index map
+     */
+    private Map<TableBlockInfo, AbstractIndex> tableBlockMap;
+
+    // block info
+    private TableBlockInfo blockInfo;
+
+    private BlockLoaderThread(TableBlockInfo blockInfo,
+        Map<TableBlockInfo, AbstractIndex> tableBlockMap) {
+      this.tableBlockMap = tableBlockMap;
+      this.blockInfo = blockInfo;
+    }
+
+    @Override public AbstractIndex call() throws Exception {
+      // load and return the loaded blocks
+      return loadBlock(tableBlockMap, blockInfo);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BtreeBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BtreeBuilder.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BtreeBuilder.java
new file mode 100644
index 0000000..fb59607
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BtreeBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+/**
+ * Below interface will be used to build the index
+ * in some data structure
+ */
+public interface BtreeBuilder {
+
+  /**
+   * Below method will be used to store the leaf collection in some data structure
+   */
+  void build(BTreeBuilderInfo blocksBuilderInfos);
+
+  /**
+   * below method to get the first data block
+   *
+   * @return data block
+   */
+  DataRefNode get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
new file mode 100644
index 0000000..e81a9a6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+
+/**
+ * Interface data block reference
+ */
+public interface DataRefNode {
+
+  /**
+   * Method to get the next block this can be used while scanning when
+   * iterator of this class can be used iterate over blocks
+   *
+   * @return next block
+   */
+  DataRefNode getNextDataRefNode();
+
+  /**
+   * to get the number of keys tuples present in the block
+   *
+   * @return number of keys in the block
+   */
+  int nodeSize();
+
+  /**
+   * Method can be used to get the block index .This can be used when multiple
+   * thread can be used scan group of blocks in that can we can assign the
+   * some of the blocks to one thread and some to other
+   *
+   * @return block number
+   */
+  long nodeNumber();
+
+  /**
+   * This method will be used to get the max value of all the columns this can
+   * be used in case of filter query
+   *
+   * @param max value of all the columns
+   */
+  byte[][] getColumnsMaxValue();
+
+  /**
+   * This method will be used to get the min value of all the columns this can
+   * be used in case of filter query
+   *
+   * @param min value of all the columns
+   */
+  byte[][] getColumnsMinValue();
+
+  /**
+   * Below method will be used to get the dimension chunks
+   *
+   * @param fileReader   file reader to read the chunks from file
+   * @param blockIndexes indexes of the blocks need to be read
+   * @return dimension data chunks
+   */
+  DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader, int[] blockIndexes);
+
+  /**
+   * Below method will be used to get the dimension chunk
+   *
+   * @param fileReader file reader to read the chunk from file
+   * @param blockIndex block index to be read
+   * @return dimension data chunk
+   */
+  DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader, int blockIndexes);
+
+  /**
+   * Below method will be used to get the measure chunk
+   *
+   * @param fileReader   file reader to read the chunk from file
+   * @param blockIndexes block indexes to be read from file
+   * @return measure column data chunk
+   */
+  MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader, int[] blockIndexes);
+
+  /**
+   * Below method will be used to read the measure chunk
+   *
+   * @param fileReader file read to read the file chunk
+   * @param blockIndex block index to be read from file
+   * @return measure data chunk
+   */
+  MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int blockIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNodeFinder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNodeFinder.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNodeFinder.java
new file mode 100644
index 0000000..78592f7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNodeFinder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+/**
+ * Below Interface is to search a block
+ */
+public interface DataRefNodeFinder {
+
+  /**
+   * Below method will be used to get the first tentative block which matches with
+   * the search key
+   *
+   * @param dataBlocks complete data blocks present
+   * @param serachKey  key to be search
+   * @return data block
+   */
+  DataRefNode findFirstDataBlock(DataRefNode dataBlocks, IndexKey searchKey);
+
+  /**
+   * Below method will be used to get the last tentative block which matches with
+   * the search key
+   *
+   * @param dataBlocks complete data blocks present
+   * @param serachKey  key to be search
+   * @return data block
+   */
+  DataRefNode findLastDataBlock(DataRefNode dataBlocks, IndexKey searchKey);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/IndexKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/IndexKey.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/IndexKey.java
new file mode 100644
index 0000000..cefd32c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/IndexKey.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+/**
+ * Index class to store the index of the segment blocklet infos
+ */
+public class IndexKey {
+
+  /**
+   * key which is generated from key generator
+   */
+  private byte[] dictionaryKeys;
+
+  /**
+   * key which was no generated using key generator
+   * <Index of FirstKey (2 bytes)><Index of SecondKey (2 bytes)><Index of NKey (2 bytes)>
+   * <First Key ByteArray><2nd Key ByteArray><N Key ByteArray>
+   */
+  private byte[] noDictionaryKeys;
+
+  public IndexKey(byte[] dictionaryKeys, byte[] noDictionaryKeys) {
+    this.dictionaryKeys = dictionaryKeys;
+    this.noDictionaryKeys = noDictionaryKeys;
+    if (null == dictionaryKeys) {
+      this.dictionaryKeys = new byte[0];
+    }
+    if (null == noDictionaryKeys) {
+      this.noDictionaryKeys = new byte[0];
+    }
+  }
+
+  /**
+   * @return the dictionaryKeys
+   */
+  public byte[] getDictionaryKeys() {
+    return dictionaryKeys;
+  }
+
+  /**
+   * @return the noDictionaryKeys
+   */
+  public byte[] getNoDictionaryKeys() {
+    return noDictionaryKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
new file mode 100644
index 0000000..50d462a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndex;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+/**
+ * Singleton Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class SegmentTaskIndexStore {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName());
+  /**
+   * singleton instance
+   */
+  private static final SegmentTaskIndexStore SEGMENTTASKINDEXSTORE = new SegmentTaskIndexStore();
+
+  /**
+   * mapping of table identifier to map of segmentId_taskId to table segment
+   * reason of so many map as each segment can have multiple data file and
+   * each file will have its own btree
+   */
+  private Map<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>> tableSegmentMap;
+
+  /**
+   * map of block info to lock object map, while loading the btree this will be filled
+   * and removed after loading the tree for that particular block info, this will be useful
+   * while loading the tree concurrently so only block level lock will be applied another
+   * block can be loaded concurrently
+   */
+  private Map<String, Object> segmentLockMap;
+
+  /**
+   * table and its lock object to this will be useful in case of concurrent
+   * query scenario when more than one query comes for same table and in  that
+   * case it will ensure that only one query will able to load the blocks
+   */
+  private Map<AbsoluteTableIdentifier, Object> tableLockMap;
+
+  private SegmentTaskIndexStore() {
+    tableSegmentMap =
+        new ConcurrentHashMap<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
+        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    segmentLockMap = new ConcurrentHashMap<String, Object>();
+  }
+
+  /**
+   * Return the instance of this class
+   *
+   * @return singleton instance
+   */
+  public static SegmentTaskIndexStore getInstance() {
+    return SEGMENTTASKINDEXSTORE;
+  }
+
+  /**
+   * Below method will be used to load the segment of segments
+   * One segment may have multiple task , so  table segment will be loaded
+   * based on task id and will return the map of taksId to table segment
+   * map
+   *
+   * @param segmentToTableBlocksInfos segment id to block info
+   * @param absoluteTableIdentifier   absolute table identifier
+   * @return map of taks id to segment mapping
+   * @throws IndexBuilderException
+   */
+  public Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
+    // task id to segment map
+    Map<String, AbstractIndex> taskIdToTableSegmentMap =
+        new HashMap<String, AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    addLockObject(absoluteTableIdentifier);
+    Iterator<Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
+        segmentToTableBlocksInfos.entrySet().iterator();
+    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+        addTableSegmentMap(absoluteTableIdentifier);
+    Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
+    String segmentId = null;
+    String taskId = null;
+    try {
+      while (iteratorOverSegmentBlocksInfos.hasNext()) {
+        // segment id to table block mapping
+        Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
+        // group task id to table block info mapping for the segment
+        Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+            mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
+        // get the existing map of task id to table segment map
+        segmentId = next.getKey();
+        // check if segment is already loaded, if segment is already loaded
+        //no need to load the segment block
+        taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
+        if (taskIdToSegmentIndexMap == null) {
+          // get the segment loader lock object this is to avoid
+          // same segment is getting loaded multiple times
+          // in case of concurrent query
+          Object segmentLoderLockObject = segmentLockMap.get(segmentId);
+          if (null == segmentLoderLockObject) {
+            segmentLoderLockObject = addAndGetSegmentLock(segmentId);
+          }
+          // acquire lock to lod the segment
+          synchronized (segmentLoderLockObject) {
+            taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
+            if (null == taskIdToSegmentIndexMap) {
+              // creating a map of take if to table segment
+              taskIdToSegmentIndexMap = new HashMap<String, AbstractIndex>();
+              Iterator<Entry<String, List<TableBlockInfo>>> iterator =
+                  taskIdToTableBlockInfoMap.entrySet().iterator();
+              while (iterator.hasNext()) {
+                Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
+                taskId = taskToBlockInfoList.getKey();
+                taskIdToSegmentIndexMap.put(taskId,
+                    loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
+              }
+              tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap);
+              // removing from segment lock map as once segment is loaded
+              //if concurrent query is coming for same segment
+              // it will wait on the lock so after this segment will be already
+              //loaded so lock is not required, that is why removing the
+              // the lock object as it wont be useful
+              segmentLockMap.remove(segmentId);
+            }
+          }
+          taskIdToTableSegmentMap.putAll(taskIdToSegmentIndexMap);
+        }
+      }
+    } catch (CarbonUtilException e) {
+      LOGGER.error("Problem while loading the segment");
+      throw new IndexBuilderException(e);
+    }
+    return taskIdToTableSegmentMap;
+  }
+
+  /**
+   * Below method will be used to get the segment level lock object
+   *
+   * @param segmentId
+   * @return lock object
+   */
+  private synchronized Object addAndGetSegmentLock(String segmentId) {
+    // get the segment lock object if it is present then return
+    // otherwise add the new lock and return
+    Object segmentLoderLockObject = segmentLockMap.get(segmentId);
+    if (null == segmentLoderLockObject) {
+      segmentLoderLockObject = new Object();
+      segmentLockMap.put(segmentId, segmentLoderLockObject);
+    }
+    return segmentLoderLockObject;
+  }
+
+  /**
+   * Below code is to add table lock map which will be used to
+   * add
+   *
+   * @param absoluteTableIdentifier
+   */
+  private synchronized void addLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // add the instance to lock map if it is not present
+    if (null == tableLockMap.get(absoluteTableIdentifier)) {
+      tableLockMap.put(absoluteTableIdentifier, new Object());
+    }
+  }
+
+  /**
+   * Below method will be used to get the table segment map
+   * if table segment is not present then it will add and return
+   *
+   * @param absoluteTableIdentifier
+   * @return table segment map
+   */
+  private Map<String, Map<String, AbstractIndex>> addTableSegmentMap(
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // get the instance of lock object
+    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
+    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+        tableSegmentMap.get(absoluteTableIdentifier);
+    if (null == tableSegmentMapTemp) {
+      synchronized (lockObject) {
+        // segment id to task id to table segment map
+        tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier);
+        if (null == tableSegmentMapTemp) {
+          tableSegmentMapTemp = new ConcurrentHashMap<String, Map<String, AbstractIndex>>();
+          tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp);
+        }
+      }
+    }
+    return tableSegmentMapTemp;
+  }
+
+  /**
+   * Below method will be used to load the blocks
+   *
+   * @param tableBlockInfoList
+   * @return loaded segment
+   * @throws CarbonUtilException
+   */
+  private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
+      AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
+    // all the block of one task id will be loaded together
+    // so creating a list which will have all the data file meta data to of one task
+    List<DataFileFooter> footerList =
+        CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
+    AbstractIndex segment = new SegmentTaskIndex();
+    // file path of only first block is passed as it all table block info path of
+    // same task id will be same
+    segment.buildIndex(footerList);
+    return segment;
+  }
+
+  /**
+   * Below method will be used to get the task id to all the table block info belongs to
+   * that task id mapping
+   *
+   * @param segmentToTableBlocksInfos segment if to table blocks info map
+   * @return task id to table block info mapping
+   */
+  private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
+    Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        new HashMap<String, List<TableBlockInfo>>();
+    Iterator<Entry<String, List<TableBlockInfo>>> iterator =
+        segmentToTableBlocksInfos.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, List<TableBlockInfo>> next = iterator.next();
+      List<TableBlockInfo> value = next.getValue();
+      for (TableBlockInfo blockInfo : value) {
+        String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath());
+        List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(taskNo);
+        if (null == list) {
+          list = new ArrayList<TableBlockInfo>();
+          taskIdToTableBlockInfoMap.put(taskNo, list);
+        }
+        list.add(blockInfo);
+      }
+
+    }
+    return taskIdToTableBlockInfoMap;
+  }
+
+  /**
+   * remove all the details of a table this will be used in case of drop table
+   *
+   * @param absoluteTableIdentifier absolute table identifier to find the table
+   */
+  public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // removing all the details of table
+    tableLockMap.remove(absoluteTableIdentifier);
+    tableSegmentMap.remove(absoluteTableIdentifier);
+  }
+
+  /**
+   * Below method will be used to remove the segment block based on
+   * segment id is passed
+   *
+   * @param segmentToBeRemoved      segment to be removed
+   * @param absoluteTableIdentifier absoluteTableIdentifier
+   */
+  public void removeTableBlocks(List<String> segmentToBeRemoved,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    // get the lock object if lock object is not present then it is not
+    // loaded at all
+    // we can return from here
+    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
+    if (null == lockObject) {
+      return;
+    }
+    // Acquire the lock and remove only those instance which was loaded
+    Map<String, Map<String, AbstractIndex>> map = tableSegmentMap.get(absoluteTableIdentifier);
+    // if there is no loaded blocks then return
+    if (null == map) {
+      return;
+    }
+    for (String segmentId : segmentToBeRemoved) {
+      map.remove(segmentId);
+    }
+  }
+
+  /**
+   * Below method will be used to check if segment blocks
+   * is already loaded or not
+   *
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   * @return is loaded then return the loaded blocks otherwise null
+   */
+  public Map<String, AbstractIndex> getSegmentBTreeIfExists(
+      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
+    Map<String, Map<String, AbstractIndex>> tableSegment =
+        tableSegmentMap.get(absoluteTableIdentifier);
+    if (null == tableSegment) {
+      return null;
+    }
+    return tableSegment.get(segmentId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
new file mode 100644
index 0000000..7e1ed8c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.DataRefNode;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+
+public abstract class AbstractIndex {
+
+  /**
+   * vo class which will hold the RS information of the block
+   */
+  protected SegmentProperties segmentProperties;
+
+  /**
+   * data block
+   */
+  protected DataRefNode dataRefNode;
+
+  /**
+   * total number of row present in the block
+   */
+  protected long totalNumberOfRows;
+
+  /**
+   * @return the totalNumberOfRows
+   */
+  public long getTotalNumberOfRows() {
+    return totalNumberOfRows;
+  }
+
+  /**
+   * @return the segmentProperties
+   */
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
+  /**
+   * @return the dataBlock
+   */
+  public DataRefNode getDataRefNode() {
+    return dataRefNode;
+  }
+
+  /**
+   * Below method will be used to load the data block
+   *
+   * @param blockInfo block detail
+   */
+  public abstract void buildIndex(List<DataFileFooter> footerList);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockIndex.java
new file mode 100644
index 0000000..cfdb127
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockIndex.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.BTreeBuilderInfo;
+import org.apache.carbondata.core.carbon.datastore.BtreeBuilder;
+import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockletBTreeBuilder;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+
+/**
+ * Class which is responsible for loading the b+ tree block. This class will
+ * persist all the detail of a table block
+ */
+public class BlockIndex extends AbstractIndex {
+
+  /**
+   * Below method will be used to load the data block
+   *
+   * @param blockInfo block detail
+   */
+  public void buildIndex(List<DataFileFooter> footerList) {
+    // create a metadata details
+    // this will be useful in query handling
+    segmentProperties = new SegmentProperties(footerList.get(0).getColumnInTable(),
+        footerList.get(0).getSegmentInfo().getColumnCardinality());
+    // create a segment builder info
+    BTreeBuilderInfo indexBuilderInfo =
+        new BTreeBuilderInfo(footerList, segmentProperties.getDimensionColumnsValueSize());
+    BtreeBuilder blocksBuilder = new BlockletBTreeBuilder();
+    // load the metadata
+    blocksBuilder.build(indexBuilderInfo);
+    dataRefNode = blocksBuilder.get();
+    totalNumberOfRows = footerList.get(0).getNumberOfRows();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
new file mode 100644
index 0000000..817aafc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+/**
+ * Abstract class which is maintains the locations of node.
+ */
+public abstract class Distributable implements Comparable<Distributable> {
+
+  public abstract String[] getLocations();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
new file mode 100644
index 0000000..816ca3a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
@@ -0,0 +1,748 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.ColumnGroupModel;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
+import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthVariableSplitGenerator;
+import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * This class contains all the details about the restructuring information of
+ * the block. This will be used during query execution to handle restructure
+ * information
+ */
+public class SegmentProperties {
+
+  /**
+   * key generator of the block which was used to generate the mdkey for
+   * normal dimension. this will be required to
+   */
+  private KeyGenerator dimensionKeyGenerator;
+
+  /**
+   * list of dimension present in the block
+   */
+  private List<CarbonDimension> dimensions;
+
+  /**
+   * list of dimension present in the block
+   */
+  private List<CarbonDimension> complexDimensions;
+
+  /**
+   * list of measure present in the block
+   */
+  private List<CarbonMeasure> measures;
+
+  /**
+   * cardinality of dimension columns participated in key generator
+   */
+  private int[] dimColumnsCardinality;
+
+  /**
+   * cardinality of complex dimension
+   */
+  private int[] complexDimColumnCardinality;
+
+  /**
+   * mapping of dimension column to block in a file this will be used for
+   * reading the blocks from file
+   */
+  private Map<Integer, Integer> dimensionOrdinalToBlockMapping;
+
+  /**
+   * a block can have multiple columns. This will have block index as key
+   * and all dimension participated in that block as values
+   */
+  private Map<Integer, Set<Integer>> blockTodimensionOrdinalMapping;
+
+  /**
+   * mapping of measure column to block to in file this will be used while
+   * reading the block in a file
+   */
+  private Map<Integer, Integer> measuresOrdinalToBlockMapping;
+
+  /**
+   * size of the each dimension column value in a block this can be used when
+   * we need to do copy a cell value to create a tuple.for no dictionary
+   * column this value will be -1. for dictionary column we size of the value
+   * will be fixed.
+   */
+  private int[] eachDimColumnValueSize;
+
+  /**
+   * size of the each dimension column value in a block this can be used when
+   * we need to do copy a cell value to create a tuple.for no dictionary
+   * column this value will be -1. for dictionary column we size of the value
+   * will be fixed.
+   */
+  private int[] eachComplexDimColumnValueSize;
+
+  /**
+   * below mapping will have mapping of the column group to dimensions ordinal
+   * for example if 3 dimension present in the columngroupid 0 and its ordinal in
+   * 2,3,4 then map will contain 0,{2,3,4}
+   */
+  private Map<Integer, KeyGenerator> columnGroupAndItsKeygenartor;
+
+  /**
+   * column group key generator dimension index will not be same as dimension ordinal
+   * This will have mapping with ordinal and keygenerator or mdkey index
+   */
+  private Map<Integer, Map<Integer, Integer>> columnGroupOrdinalToMdkeymapping;
+
+  /**
+   * this will be used to split the fixed length key
+   * this will all the information about how key was created
+   * and how to split the key based on group
+   */
+  private ColumnarSplitter fixedLengthKeySplitter;
+
+  /**
+   * to store the number of no dictionary dimension
+   * this will be used during query execution for creating
+   * start and end key. Purpose of storing this value here is
+   * so during query execution no need to calculate every time
+   */
+  private int numberOfNoDictionaryDimension;
+
+  /**
+   * column group model
+   */
+  private ColumnGroupModel colGroupModel;
+
+  public SegmentProperties(List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+    dimensions = new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    complexDimensions =
+        new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    measures = new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    fillDimensionAndMeasureDetails(columnsInTable, columnCardinality);
+    dimensionOrdinalToBlockMapping =
+        new HashMap<Integer, Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    blockTodimensionOrdinalMapping =
+        new HashMap<Integer, Set<Integer>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    measuresOrdinalToBlockMapping =
+        new HashMap<Integer, Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    intialiseColGroups(columnsInTable);
+    fillOrdinalToBlockMappingForDimension();
+    fillOrdinalToBlockIndexMappingForMeasureColumns();
+    fillColumnGroupAndItsCardinality(columnCardinality);
+    fillKeyGeneratorDetails();
+  }
+
+  /**
+   * it fills column groups
+   * e.g {{1},{2,3,4},{5},{6},{7,8,9}}
+   *
+   * @param columnsInTable
+   */
+  private void intialiseColGroups(List<ColumnSchema> columnsInTable) {
+    // StringBuffer columnGroups = new StringBuffer();
+    List<List<Integer>> colGrpList = new ArrayList<List<Integer>>();
+    List<Integer> group = new ArrayList<Integer>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      CarbonDimension dimension = dimensions.get(i);
+      if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
+        continue;
+      }
+      group.add(dimension.getOrdinal());
+      // columnGroups.append(dimension.getOrdinal());
+      if (i < dimensions.size() - 1) {
+        int currGroupOrdinal = dimension.columnGroupId();
+        int nextGroupOrdinal = dimensions.get(i + 1).columnGroupId();
+        if (!(currGroupOrdinal == nextGroupOrdinal && currGroupOrdinal != -1)) {
+          colGrpList.add(group);
+          group = new ArrayList<Integer>();
+        }
+      } else {
+        colGrpList.add(group);
+      }
+
+    }
+    int[][] colGroups = new int[colGrpList.size()][];
+    for (int i = 0; i < colGroups.length; i++) {
+      colGroups[i] = new int[colGrpList.get(i).size()];
+      for (int j = 0; j < colGroups[i].length; j++) {
+        colGroups[i][j] = colGrpList.get(i).get(j);
+      }
+    }
+    this.colGroupModel = CarbonUtil.getColGroupModel(colGroups);
+  }
+
+  /**
+   * below method is to fill the dimension and its mapping to file blocks all
+   * the column will point to same column group
+   */
+  private void fillOrdinalToBlockMappingForDimension() {
+    int blockOrdinal = -1;
+    CarbonDimension dimension = null;
+    int index = 0;
+    int prvcolumnGroupId = -1;
+    while (index < dimensions.size()) {
+      dimension = dimensions.get(index);
+      // if column id is same as previous one then block index will be
+      // same
+      if (dimension.isColumnar() || dimension.columnGroupId() != prvcolumnGroupId) {
+        blockOrdinal++;
+      }
+      dimensionOrdinalToBlockMapping.put(dimension.getOrdinal(), blockOrdinal);
+      prvcolumnGroupId = dimension.columnGroupId();
+      index++;
+    }
+    index = 0;
+    // complex dimension will be stored at last
+    while (index < complexDimensions.size()) {
+      dimension = complexDimensions.get(index);
+      dimensionOrdinalToBlockMapping.put(dimension.getOrdinal(), ++blockOrdinal);
+      blockOrdinal = fillComplexDimensionChildBlockIndex(blockOrdinal, dimension);
+      index++;
+    }
+    fillBlockToDimensionOrdinalMapping();
+  }
+
+  /**
+   *
+   */
+  private void fillBlockToDimensionOrdinalMapping() {
+    Set<Entry<Integer, Integer>> blocks = dimensionOrdinalToBlockMapping.entrySet();
+    Iterator<Entry<Integer, Integer>> blockItr = blocks.iterator();
+    while (blockItr.hasNext()) {
+      Entry<Integer, Integer> block = blockItr.next();
+      Set<Integer> dimensionOrdinals = blockTodimensionOrdinalMapping.get(block.getValue());
+      if (dimensionOrdinals == null) {
+        dimensionOrdinals = new HashSet<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        blockTodimensionOrdinalMapping.put(block.getValue(), dimensionOrdinals);
+      }
+      dimensionOrdinals.add(block.getKey());
+    }
+  }
+
+  /**
+   * Below method will be used to add the complex dimension child
+   * block index.It is a recursive method which will be get the children
+   * add the block index
+   *
+   * @param blockOrdinal start block ordinal
+   * @param dimension    parent dimension
+   * @return last block index
+   */
+  private int fillComplexDimensionChildBlockIndex(int blockOrdinal, CarbonDimension dimension) {
+    for (int i = 0; i < dimension.numberOfChild(); i++) {
+      dimensionOrdinalToBlockMapping
+          .put(dimension.getListOfChildDimensions().get(i).getOrdinal(), ++blockOrdinal);
+      if (dimension.getListOfChildDimensions().get(i).numberOfChild() > 0) {
+        blockOrdinal = fillComplexDimensionChildBlockIndex(blockOrdinal,
+            dimension.getListOfChildDimensions().get(i));
+      }
+    }
+    return blockOrdinal;
+  }
+
+  /**
+   * Below method will be used to fill the mapping
+   * of measure ordinal to its block index mapping in
+   * file
+   */
+  private void fillOrdinalToBlockIndexMappingForMeasureColumns() {
+    int blockOrdinal = 0;
+    int index = 0;
+    while (index < measures.size()) {
+      measuresOrdinalToBlockMapping.put(measures.get(index).getOrdinal(), blockOrdinal);
+      blockOrdinal++;
+      index++;
+    }
+  }
+
+  /**
+   * below method will fill dimension and measure detail of the block.
+   *
+   * @param columnsInTable
+   * @param columnCardinality
+   */
+  private void fillDimensionAndMeasureDetails(List<ColumnSchema> columnsInTable,
+      int[] columnCardinality) {
+    ColumnSchema columnSchema = null;
+    // ordinal will be required to read the data from file block
+    int dimensonOrdinal = 0;
+    int measureOrdinal = -1;
+    // table ordinal is actually a schema ordinal this is required as
+    // cardinality array
+    // which is stored in segment info contains -1 if that particular column
+    // is n
+    int tableOrdinal = -1;
+    // creating a list as we do not know how many dimension not participated
+    // in the mdkey
+    List<Integer> cardinalityIndexForNormalDimensionColumn =
+        new ArrayList<Integer>(columnsInTable.size());
+    // creating a list as we do not know how many dimension not participated
+    // in the mdkey
+    List<Integer> cardinalityIndexForComplexDimensionColumn =
+        new ArrayList<Integer>(columnsInTable.size());
+    boolean isComplexDimensionStarted = false;
+    CarbonDimension carbonDimension = null;
+    // to store the position of dimension in surrogate key array which is
+    // participating in mdkey
+    int keyOrdinal = 0;
+    int previousColumnGroup = -1;
+    // to store the ordinal of the column group ordinal
+    int columnGroupOrdinal = 0;
+    int counter = 0;
+    int complexTypeOrdinal = 0;
+    while (counter < columnsInTable.size()) {
+      columnSchema = columnsInTable.get(counter);
+      if (columnSchema.isDimensionColumn()) {
+        tableOrdinal++;
+        // not adding the cardinality of the non dictionary
+        // column as it was not the part of mdkey
+        if (CarbonUtil.hasEncoding(columnSchema.getEncodingList(), Encoding.DICTIONARY)
+            && !isComplexDimensionStarted && columnSchema.getNumberOfChild() == 0) {
+          cardinalityIndexForNormalDimensionColumn.add(tableOrdinal);
+          if (columnSchema.isColumnar()) {
+            // if it is a columnar dimension participated in mdkey then added
+            // key ordinal and dimension ordinal
+            carbonDimension =
+                new CarbonDimension(columnSchema, dimensonOrdinal++, keyOrdinal++, -1, -1);
+          } else {
+            // if not columnnar then it is a column group dimension
+
+            // below code to handle first dimension of the column group
+            // in this case ordinal of the column group will be 0
+            if (previousColumnGroup != columnSchema.getColumnGroupId()) {
+              columnGroupOrdinal = 0;
+              carbonDimension = new CarbonDimension(columnSchema, dimensonOrdinal++, keyOrdinal++,
+                  columnGroupOrdinal++, -1);
+            }
+            // if previous dimension  column group id is same as current then
+            // then its belongs to same row group
+            else {
+              carbonDimension = new CarbonDimension(columnSchema, dimensonOrdinal++, keyOrdinal++,
+                  columnGroupOrdinal++, -1);
+            }
+            previousColumnGroup = columnSchema.getColumnGroupId();
+          }
+        }
+        // as complex type will be stored at last so once complex type started all the dimension
+        // will be added to complex type
+        else if (isComplexDimensionStarted || CarbonUtil.hasDataType(columnSchema.getDataType(),
+            new DataType[] { DataType.ARRAY, DataType.STRUCT })) {
+          cardinalityIndexForComplexDimensionColumn.add(tableOrdinal);
+          carbonDimension =
+              new CarbonDimension(columnSchema, dimensonOrdinal++, -1, -1, complexTypeOrdinal++);
+          carbonDimension.initializeChildDimensionsList(columnSchema.getNumberOfChild());
+          complexDimensions.add(carbonDimension);
+          isComplexDimensionStarted = true;
+          int previouseOrdinal = dimensonOrdinal;
+          dimensonOrdinal =
+              readAllComplexTypeChildrens(dimensonOrdinal, columnSchema.getNumberOfChild(),
+                  columnsInTable, carbonDimension, complexTypeOrdinal);
+          int numberOfChildrenDimensionAdded = dimensonOrdinal - previouseOrdinal;
+          for (int i = 0; i < numberOfChildrenDimensionAdded; i++) {
+            cardinalityIndexForComplexDimensionColumn.add(++tableOrdinal);
+          }
+          counter = dimensonOrdinal;
+          complexTypeOrdinal = carbonDimension.getListOfChildDimensions()
+              .get(carbonDimension.getListOfChildDimensions().size() - 1).getComplexTypeOrdinal();
+          complexTypeOrdinal++;
+          continue;
+        } else {
+          // for no dictionary dimension
+          carbonDimension = new CarbonDimension(columnSchema, dimensonOrdinal++, -1, -1, -1);
+          numberOfNoDictionaryDimension++;
+        }
+        dimensions.add(carbonDimension);
+      } else {
+        measures.add(new CarbonMeasure(columnSchema, ++measureOrdinal));
+      }
+      counter++;
+    }
+    dimColumnsCardinality = new int[cardinalityIndexForNormalDimensionColumn.size()];
+    complexDimColumnCardinality = new int[cardinalityIndexForComplexDimensionColumn.size()];
+    int index = 0;
+    // filling the cardinality of the dimension column to create the key
+    // generator
+    for (Integer cardinalityArrayIndex : cardinalityIndexForNormalDimensionColumn) {
+      dimColumnsCardinality[index++] = columnCardinality[cardinalityArrayIndex];
+    }
+    index = 0;
+    // filling the cardinality of the complex dimension column to create the
+    // key generator
+    for (Integer cardinalityArrayIndex : cardinalityIndexForComplexDimensionColumn) {
+      complexDimColumnCardinality[index++] = columnCardinality[cardinalityArrayIndex];
+    }
+  }
+
+  /**
+   * Read all primitive/complex children and set it as list of child carbon dimension to parent
+   * dimension
+   *
+   * @param dimensionOrdinal
+   * @param childCount
+   * @param listOfColumns
+   * @param parentDimension
+   * @return
+   */
+  private int readAllComplexTypeChildrens(int dimensionOrdinal, int childCount,
+      List<ColumnSchema> listOfColumns, CarbonDimension parentDimension,
+      int complexDimensionOrdianl) {
+    for (int i = 0; i < childCount; i++) {
+      ColumnSchema columnSchema = listOfColumns.get(dimensionOrdinal);
+      if (columnSchema.isDimensionColumn()) {
+        if (columnSchema.getNumberOfChild() > 0) {
+          CarbonDimension complexDimension =
+              new CarbonDimension(columnSchema, dimensionOrdinal++, -1, -1,
+                  complexDimensionOrdianl++);
+          complexDimension.initializeChildDimensionsList(columnSchema.getNumberOfChild());
+          parentDimension.getListOfChildDimensions().add(complexDimension);
+          dimensionOrdinal =
+              readAllComplexTypeChildrens(dimensionOrdinal, columnSchema.getNumberOfChild(),
+                  listOfColumns, complexDimension, complexDimensionOrdianl);
+        } else {
+          parentDimension.getListOfChildDimensions().add(
+              new CarbonDimension(columnSchema, dimensionOrdinal++, -1, -1,
+                  complexDimensionOrdianl++));
+        }
+      }
+    }
+    return dimensionOrdinal;
+  }
+
+  /**
+   * Below method will fill the key generator detail of both the type of key
+   * generator. This will be required for during both query execution and data
+   * loading.
+   */
+  private void fillKeyGeneratorDetails() {
+    // create a dimension partitioner list
+    // this list will contain information about how dimension value are
+    // stored
+    // it is stored in group or individually
+    List<Integer> dimensionPartitionList =
+        new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<Boolean> isDictionaryColumn =
+        new ArrayList<Boolean>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    int prvcolumnGroupId = -1;
+    int counter = 0;
+    while (counter < dimensions.size()) {
+      CarbonDimension carbonDimension = dimensions.get(counter);
+      // if dimension is not a part of mdkey then no need to add
+      if (!carbonDimension.getEncoder().contains(Encoding.DICTIONARY)) {
+        isDictionaryColumn.add(false);
+        counter++;
+        continue;
+      }
+      // columnar column is stored individually
+      // so add one
+      if (carbonDimension.isColumnar()) {
+        dimensionPartitionList.add(1);
+        isDictionaryColumn.add(true);
+      }
+      // if in a group then need to add how many columns a selected in
+      // group
+      if (!carbonDimension.isColumnar() && carbonDimension.columnGroupId() == prvcolumnGroupId) {
+        // incrementing the previous value of the list as it is in same column group
+        dimensionPartitionList.set(dimensionPartitionList.size() - 1,
+            dimensionPartitionList.get(dimensionPartitionList.size() - 1) + 1);
+      } else if (!carbonDimension.isColumnar()) {
+        dimensionPartitionList.add(1);
+        isDictionaryColumn.add(true);
+      }
+      prvcolumnGroupId = carbonDimension.columnGroupId();
+      counter++;
+    }
+    // get the partitioner
+    int[] dimensionPartitions = ArrayUtils
+        .toPrimitive(dimensionPartitionList.toArray(new Integer[dimensionPartitionList.size()]));
+    // get the bit length of each column
+    int[] bitLength = CarbonUtil.getDimensionBitLength(dimColumnsCardinality, dimensionPartitions);
+    // create a key generator
+    this.dimensionKeyGenerator = new MultiDimKeyVarLengthGenerator(bitLength);
+    this.fixedLengthKeySplitter =
+        new MultiDimKeyVarLengthVariableSplitGenerator(bitLength, dimensionPartitions);
+    // get the size of each value in file block
+    int[] dictionayDimColumnValueSize = fixedLengthKeySplitter.getBlockKeySize();
+    int index = -1;
+    this.eachDimColumnValueSize = new int[isDictionaryColumn.size()];
+    for (int i = 0; i < eachDimColumnValueSize.length; i++) {
+      if (!isDictionaryColumn.get(i)) {
+        eachDimColumnValueSize[i] = -1;
+        continue;
+      }
+      eachDimColumnValueSize[i] = dictionayDimColumnValueSize[++index];
+    }
+    if (complexDimensions.size() > 0) {
+      int[] complexDimesionParition = new int[complexDimColumnCardinality.length];
+      // as complex dimension will be stored in column format add one
+      Arrays.fill(complexDimesionParition, 1);
+      bitLength =
+          CarbonUtil.getDimensionBitLength(complexDimColumnCardinality, complexDimesionParition);
+      for (int i = 0; i < bitLength.length; i++) {
+        if (complexDimColumnCardinality[i] == 0) {
+          bitLength[i] = 64;
+        }
+      }
+      ColumnarSplitter keySplitter =
+          new MultiDimKeyVarLengthVariableSplitGenerator(bitLength, complexDimesionParition);
+      eachComplexDimColumnValueSize = keySplitter.getBlockKeySize();
+    } else {
+      eachComplexDimColumnValueSize = new int[0];
+    }
+  }
+
+  /**
+   * Below method will be used to create a mapping of column group and its column cardinality this
+   * mapping will have column group id to cardinality of the dimension present in
+   * the column group.This mapping will be used during query execution, to create
+   * a mask key for the column group dimension which will be used in aggregation
+   * and filter query as column group dimension will be stored at the bit level
+   */
+  private void fillColumnGroupAndItsCardinality(int[] cardinality) {
+    // mapping of the column group and its ordinal
+    Map<Integer, List<Integer>> columnGroupAndOrdinalMapping =
+        new HashMap<Integer, List<Integer>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // to store a column group
+    List<Integer> currentColumnGroup = null;
+    // current index
+    int index = 0;
+    // previous column group to check all the column of column id has bee selected
+    int prvColumnGroupId = -1;
+    while (index < dimensions.size()) {
+      // if dimension group id is not zero and it is same as the previous
+      // column id
+      // then we need to add ordinal of that column as it belongs to same
+      // column group
+      if (!dimensions.get(index).isColumnar()
+          && dimensions.get(index).columnGroupId() == prvColumnGroupId
+          && null != currentColumnGroup) {
+        currentColumnGroup.add(index);
+      }
+      // if column is not a columnar then new column group has come
+      // so we need to create a list of new column id group and add the
+      // ordinal
+      else if (!dimensions.get(index).isColumnar()) {
+        currentColumnGroup = new ArrayList<Integer>();
+        columnGroupAndOrdinalMapping.put(dimensions.get(index).columnGroupId(), currentColumnGroup);
+        currentColumnGroup.add(index);
+      }
+      // update the column id every time,this is required to group the
+      // columns
+      // of the same column group
+      prvColumnGroupId = dimensions.get(index).columnGroupId();
+      index++;
+    }
+    // Initializing the map
+    this.columnGroupAndItsKeygenartor =
+        new HashMap<Integer, KeyGenerator>(columnGroupAndOrdinalMapping.size());
+    this.columnGroupOrdinalToMdkeymapping = new HashMap<>(columnGroupAndOrdinalMapping.size());
+    int[] columnGroupCardinality = null;
+    index = 0;
+    Iterator<Entry<Integer, List<Integer>>> iterator =
+        columnGroupAndOrdinalMapping.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Integer, List<Integer>> next = iterator.next();
+      List<Integer> currentGroupOrdinal = next.getValue();
+      Map<Integer, Integer> colGrpOrdinalMdkeyMapping = new HashMap<>(currentGroupOrdinal.size());
+      // create the cardinality array
+      columnGroupCardinality = new int[currentGroupOrdinal.size()];
+      for (int i = 0; i < columnGroupCardinality.length; i++) {
+        // fill the cardinality
+        columnGroupCardinality[i] = cardinality[currentGroupOrdinal.get(i)];
+        colGrpOrdinalMdkeyMapping.put(currentGroupOrdinal.get(i), i);
+      }
+      this.columnGroupAndItsKeygenartor.put(next.getKey(), new MultiDimKeyVarLengthGenerator(
+          CarbonUtil.getDimensionBitLength(columnGroupCardinality,
+              new int[] { columnGroupCardinality.length })));
+      this.columnGroupOrdinalToMdkeymapping.put(next.getKey(), colGrpOrdinalMdkeyMapping);
+    }
+  }
+
+  /**
+   * Below method is to get the value of each dimension column. As this method
+   * will be used only once so we can merge both the dimension and complex
+   * dimension array. Complex dimension will be store at last so first copy
+   * the normal dimension the copy the complex dimension size. If we store
+   * this value as a class variable unnecessarily we will waste some space
+   *
+   * @return each dimension value size
+   */
+  public int[] getDimensionColumnsValueSize() {
+    int[] dimensionValueSize =
+        new int[eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length];
+    System
+        .arraycopy(eachDimColumnValueSize, 0, dimensionValueSize, 0, eachDimColumnValueSize.length);
+    System.arraycopy(eachComplexDimColumnValueSize, 0, dimensionValueSize,
+        eachDimColumnValueSize.length, eachComplexDimColumnValueSize.length);
+    return dimensionValueSize;
+  }
+
+  /**
+   * @return the dimensionKeyGenerator
+   */
+  public KeyGenerator getDimensionKeyGenerator() {
+    return dimensionKeyGenerator;
+  }
+
+  /**
+   * @return the dimensions
+   */
+  public List<CarbonDimension> getDimensions() {
+    return dimensions;
+  }
+
+  /**
+   * @return the complexDimensions
+   */
+  public List<CarbonDimension> getComplexDimensions() {
+    return complexDimensions;
+  }
+
+  /**
+   * @return the measures
+   */
+  public List<CarbonMeasure> getMeasures() {
+    return measures;
+  }
+
+  /**
+   * @return the dimColumnsCardinality
+   */
+  public int[] getDimColumnsCardinality() {
+    return dimColumnsCardinality;
+  }
+
+  /**
+   * @return the complexDimColumnCardinality
+   */
+  public int[] getComplexDimColumnCardinality() {
+    return complexDimColumnCardinality;
+  }
+
+  /**
+   * @return the dimensionOrdinalToBlockMapping
+   */
+  public Map<Integer, Integer> getDimensionOrdinalToBlockMapping() {
+    return dimensionOrdinalToBlockMapping;
+  }
+
+  /**
+   * @return the measuresOrdinalToBlockMapping
+   */
+  public Map<Integer, Integer> getMeasuresOrdinalToBlockMapping() {
+    return measuresOrdinalToBlockMapping;
+  }
+
+  /**
+   * @return the eachDimColumnValueSize
+   */
+  public int[] getEachDimColumnValueSize() {
+    return eachDimColumnValueSize;
+  }
+
+  /**
+   * @return the eachComplexDimColumnValueSize
+   */
+  public int[] getEachComplexDimColumnValueSize() {
+    return eachComplexDimColumnValueSize;
+  }
+
+  /**
+   * @return the fixedLengthKeySplitter
+   */
+  public ColumnarSplitter getFixedLengthKeySplitter() {
+    return fixedLengthKeySplitter;
+  }
+
+  /**
+   * @return the columnGroupAndItsKeygenartor
+   */
+  public Map<Integer, KeyGenerator> getColumnGroupAndItsKeygenartor() {
+    return columnGroupAndItsKeygenartor;
+  }
+
+  /**
+   * @return the numberOfNoDictionaryDimension
+   */
+  public int getNumberOfNoDictionaryDimension() {
+    return numberOfNoDictionaryDimension;
+  }
+
+  /**
+   * @return
+   */
+  public int[][] getColumnGroups() {
+    return colGroupModel.getColumnGroup();
+  }
+
+  /**
+   * @return colGroupModel
+   */
+  public ColumnGroupModel getColumnGroupModel() {
+    return this.colGroupModel;
+  }
+
+  /**
+   * get mdkey ordinal for given dimension ordinal of given column group
+   *
+   * @param colGrpId
+   * @param ordinal
+   * @return mdkeyordinal
+   */
+  public int getColumnGroupMdKeyOrdinal(int colGrpId, int ordinal) {
+    return columnGroupOrdinalToMdkeymapping.get(colGrpId).get(ordinal);
+  }
+
+  /**
+   * It returns no of column availble in given column group
+   *
+   * @param colGrpId
+   * @return no of column in given column group
+   */
+  public int getNoOfColumnsInColumnGroup(int colGrpId) {
+    return columnGroupOrdinalToMdkeymapping.get(colGrpId).size();
+  }
+
+  /**
+   * @param blockIndex
+   * @return It returns all dimension present in given block index
+   */
+  public Set<Integer> getDimensionOrdinalForBlock(int blockIndex) {
+    return blockTodimensionOrdinalMapping.get(blockIndex);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndex.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndex.java
new file mode 100644
index 0000000..ce7a63a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.BTreeBuilderInfo;
+import org.apache.carbondata.core.carbon.datastore.BtreeBuilder;
+import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeBuilder;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+
+/**
+ * Class which is responsible for loading the b+ tree block. This class will
+ * persist all the detail of a table segment
+ */
+public class SegmentTaskIndex extends AbstractIndex {
+
+  /**
+   * Below method is store the blocks in some data structure
+   *
+   * @param blockInfo block detail
+   */
+  public void buildIndex(List<DataFileFooter> footerList) {
+    // create a metadata details
+    // this will be useful in query handling
+    // all the data file metadata will have common segment properties we
+    // can use first one to get create the segment properties
+    segmentProperties = new SegmentProperties(footerList.get(0).getColumnInTable(),
+        footerList.get(0).getSegmentInfo().getColumnCardinality());
+    // create a segment builder info
+    // in case of segment create we do not need any file path and each column value size
+    // as Btree will be build as per min max and start key
+    BTreeBuilderInfo btreeBuilderInfo = new BTreeBuilderInfo(footerList, null);
+    BtreeBuilder blocksBuilder = new BlockBTreeBuilder();
+    // load the metadata
+    blocksBuilder.build(btreeBuilderInfo);
+    dataRefNode = blocksBuilder.get();
+    for (DataFileFooter footer : footerList) {
+      totalNumberOfRows += footer.getNumberOfRows();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
new file mode 100644
index 0000000..06166fd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+
+/**
+ * class will be used to pass the block detail detail will be passed form driver
+ * to all the executor to load the b+ tree
+ */
+public class TableBlockInfo extends Distributable
+    implements Serializable, Comparable<Distributable> {
+
+  /**
+   * serialization id
+   */
+  private static final long serialVersionUID = -6502868998599821172L;
+
+  /**
+   * full qualified file path of the block
+   */
+  private String filePath;
+
+  /**
+   * block offset in the file
+   */
+  private long blockOffset;
+
+  /**
+   * length of the block
+   */
+  private long blockLength;
+
+  /**
+   * id of the segment this will be used to sort the blocks
+   */
+  private String segmentId;
+
+  private String[] locations;
+
+
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
+      long blockLength) {
+    this.filePath = FileFactory.getUpdatedFilePath(filePath);
+    this.blockOffset = blockOffset;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.blockLength = blockLength;
+  }
+
+  /**
+   * @return the filePath
+   */
+  public String getFilePath() {
+    return filePath;
+  }
+
+  /**
+   * @return the blockOffset
+   */
+  public long getBlockOffset() {
+    return blockOffset;
+  }
+
+
+  /**
+   * @return the segmentId
+   */
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * @return the blockLength
+   */
+  public long getBlockLength() {
+    return blockLength;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see java.lang.Object#equals(java.lang.Object)
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof TableBlockInfo)) {
+      return false;
+    }
+    TableBlockInfo other = (TableBlockInfo) obj;
+    if (!segmentId.equals(other.segmentId)) {
+      return false;
+    }
+    if (blockOffset != other.blockOffset) {
+      return false;
+    }
+    if (blockLength != other.blockLength) {
+      return false;
+    }
+
+    if (filePath == null) {
+      if (other.filePath != null) {
+        return false;
+      }
+    } else if (!filePath.equals(other.filePath)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Below method will used to compare to TableBlockInfos object this will
+   * used for sorting Comparison logic is: 1. compare segment id if segment id
+   * is same 2. compare task id if task id is same 3. compare offsets of the
+   * block
+   */
+  @Override public int compareTo(Distributable other) {
+
+    int compareResult = 0;
+    // get the segment id
+    // converr seg ID to double.
+
+    double seg1 = Double.parseDouble(segmentId);
+    double seg2 = Double.parseDouble(((TableBlockInfo) other).segmentId);
+    if (seg1 - seg2 < 0) {
+      return -1;
+    }
+    if (seg1 - seg2 > 0) {
+      return 1;
+    }
+
+    // Comparing the time task id of the file to other
+    // if both the task id of the file is same then we need to compare the
+    // offset of
+    // the file
+    if (CarbonTablePath.isCarbonDataFile(filePath)) {
+      int firstTaskId = Integer.parseInt(DataFileUtil.getTaskNo(filePath));
+      int otherTaskId = Integer.parseInt(DataFileUtil.getTaskNo(((TableBlockInfo) other).filePath));
+      if (firstTaskId != otherTaskId) {
+        return firstTaskId - otherTaskId;
+      }
+      // compare the part no of both block info
+      int firstPartNo = Integer.parseInt(DataFileUtil.getPartNo(filePath));
+      int SecondPartNo =
+          Integer.parseInt(DataFileUtil.getPartNo(((TableBlockInfo) other).filePath));
+      compareResult = firstPartNo - SecondPartNo;
+    } else {
+      compareResult = filePath.compareTo(((TableBlockInfo) other).getFilePath());
+    }
+    if (compareResult != 0) {
+      return compareResult;
+    }
+    //compare result is not 0 then return
+    // if part no is also same then compare the offset and length of the block
+    if (blockOffset + blockLength
+        < ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) other).blockLength) {
+      return -1;
+    } else if (blockOffset + blockLength
+        > ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) other).blockLength) {
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override public int hashCode() {
+    int result = filePath.hashCode();
+    result = 31 * result + (int) (blockOffset ^ (blockOffset >>> 32));
+    result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
+    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + Arrays.hashCode(locations);
+    return result;
+  }
+
+  @Override public String[] getLocations() {
+    return locations;
+  }
+
+}