You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:21 UTC

[36/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java
new file mode 100644
index 0000000..9037e0d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Util class for merge activities of 2 loads.
+ */
+public class CarbonMergerUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonMergerUtil.class.getName());
+
+  public static int[] getCardinalityFromLevelMetadata(String path, String tableName) {
+    int[] localCardinality = null;
+    try {
+      localCardinality = CarbonUtil.getCardinalityFromLevelMetadataFile(
+          path + '/' + CarbonCommonConstants.LEVEL_METADATA_FILE + tableName + ".metadata");
+    } catch (CarbonUtilException e) {
+      LOGGER.error("Error occurred :: " + e.getMessage());
+    }
+
+    return localCardinality;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
new file mode 100644
index 0000000..de0ea44
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletBTreeIndex;
+import org.apache.carbondata.format.BlockletIndex;
+import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.BlockletMinMaxIndex;
+import org.apache.carbondata.format.ChunkCompressionMeta;
+import org.apache.carbondata.format.ColumnSchema;
+import org.apache.carbondata.format.CompressionCodec;
+import org.apache.carbondata.format.DataChunk;
+import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.format.PresenceMeta;
+import org.apache.carbondata.format.SegmentInfo;
+import org.apache.carbondata.format.SortState;
+
+/**
+ * Util class to convert to thrift metdata classes
+ */
+public class CarbonMetadataUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonMetadataUtil.class.getName());
+
+  /**
+   * It converts list of BlockletInfoColumnar to FileFooter thrift objects
+   *
+   * @param infoList
+   * @param numCols
+   * @param cardinalities
+   * @return FileFooter
+   */
+  public static FileFooter convertFileFooter(List<BlockletInfoColumnar> infoList, int numCols,
+      int[] cardinalities, List<ColumnSchema> columnSchemaList,
+      SegmentProperties segmentProperties) throws IOException {
+
+    SegmentInfo segmentInfo = new SegmentInfo();
+    segmentInfo.setNum_cols(columnSchemaList.size());
+    segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
+
+    FileFooter footer = new FileFooter();
+    footer.setNum_rows(getTotalNumberOfRows(infoList));
+    footer.setSegment_info(segmentInfo);
+    for (BlockletInfoColumnar info : infoList) {
+      footer.addToBlocklet_index_list(getBlockletIndex(info));
+    }
+    footer.setTable_columns(columnSchemaList);
+    for (BlockletInfoColumnar info : infoList) {
+      footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, segmentProperties));
+    }
+    return footer;
+  }
+
+  private static BlockletIndex getBlockletIndex(
+      org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex info) {
+    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+
+    for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
+    }
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
+    blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());
+    BlockletIndex blockletIndex = new BlockletIndex();
+    blockletIndex.setMin_max_index(blockletMinMaxIndex);
+    blockletIndex.setB_tree_index(blockletBTreeIndex);
+    return blockletIndex;
+  }
+
+  /**
+   * Get total number of rows for the file.
+   *
+   * @param infoList
+   * @return
+   */
+  private static long getTotalNumberOfRows(List<BlockletInfoColumnar> infoList) {
+    long numberOfRows = 0;
+    for (BlockletInfoColumnar info : infoList) {
+      numberOfRows += info.getNumberOfKeys();
+    }
+    return numberOfRows;
+  }
+
+  private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
+
+    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+    for (byte[] max : info.getColumnMaxData()) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+    }
+    for (byte[] min : info.getColumnMinData()) {
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+    }
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStart_key(info.getStartKey());
+    blockletBTreeIndex.setEnd_key(info.getEndKey());
+
+    BlockletIndex blockletIndex = new BlockletIndex();
+    blockletIndex.setMin_max_index(blockletMinMaxIndex);
+    blockletIndex.setB_tree_index(blockletBTreeIndex);
+    return blockletIndex;
+  }
+
+  private static BlockletInfo getBlockletInfo(BlockletInfoColumnar blockletInfoColumnar,
+      List<ColumnSchema> columnSchenma,
+      SegmentProperties segmentProperties) throws IOException {
+
+    BlockletInfo blockletInfo = new BlockletInfo();
+    blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys());
+
+    List<DataChunk> colDataChunks = new ArrayList<DataChunk>();
+    blockletInfoColumnar.getKeyLengths();
+    int j = 0;
+    int aggregateIndex = 0;
+    boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
+    boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
+    boolean[] colGrpblock = blockletInfoColumnar.getColGrpBlocks();
+    for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
+      DataChunk dataChunk = new DataChunk();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      if (containsEncoding(i, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+        encodings.add(Encoding.DICTIONARY);
+      }
+      if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+        encodings.add(Encoding.DIRECT_DICTIONARY);
+      }
+      dataChunk.setRowMajor(colGrpblock[i]);
+      //TODO : Once schema PR is merged and information needs to be passed here.
+      dataChunk.setColumn_ids(new ArrayList<Integer>());
+      dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
+      dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
+      if (aggKeyBlock[i]) {
+        dataChunk.setRle_page_offset(blockletInfoColumnar.getDataIndexMapOffsets()[aggregateIndex]);
+        dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
+        encodings.add(Encoding.RLE);
+        aggregateIndex++;
+      }
+      dataChunk
+          .setSort_state(isSortedKeyColumn[i] ? SortState.SORT_EXPLICIT : SortState.SORT_NATIVE);
+
+      if (!isSortedKeyColumn[i]) {
+        dataChunk.setRowid_page_offset(blockletInfoColumnar.getKeyBlockIndexOffSets()[j]);
+        dataChunk.setRowid_page_length(blockletInfoColumnar.getKeyBlockIndexLength()[j]);
+        encodings.add(Encoding.INVERTED_INDEX);
+        j++;
+      }
+
+      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      dataChunk.setEncoders(encodings);
+
+      colDataChunks.add(dataChunk);
+    }
+
+    for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) {
+      DataChunk dataChunk = new DataChunk();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setRowMajor(false);
+      //TODO : Once schema PR is merged and information needs to be passed here.
+      dataChunk.setColumn_ids(new ArrayList<Integer>());
+      dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
+      dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
+      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      encodings.add(Encoding.DELTA);
+      dataChunk.setEncoders(encodings);
+      //TODO writing dummy presence meta need to set actual presence
+      //meta
+      PresenceMeta presenceMeta = new PresenceMeta();
+      presenceMeta.setPresent_bit_streamIsSet(true);
+      presenceMeta
+          .setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
+      dataChunk.setPresence(presenceMeta);
+      //TODO : PresenceMeta needs to be implemented and set here
+      // dataChunk.setPresence(new PresenceMeta());
+      //TODO : Need to write ValueCompression meta here.
+      List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+      encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
+          createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
+      dataChunk.setEncoder_meta(encoderMetaList);
+      colDataChunks.add(dataChunk);
+    }
+    blockletInfo.setColumn_data_chunks(colDataChunks);
+
+    return blockletInfo;
+  }
+
+  /**
+   * @param blockIndex
+   * @param encoding
+   * @param columnSchemas
+   * @param segmentProperties
+   * @return return true if given encoding is present in column
+   */
+  private static boolean containsEncoding(int blockIndex, Encoding encoding,
+      List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) {
+    Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex);
+    //column groups will always have dictionary encoding
+    if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) {
+      return true;
+    }
+    for (Integer dimOrdinal : dimOrdinals) {
+      if (columnSchemas.get(dimOrdinal).encoders.contains(encoding)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static byte[] serializeEncoderMeta(ValueEncoderMeta encoderMeta) throws IOException {
+    // TODO : should remove the unnecessary fields.
+    ByteArrayOutputStream aos = new ByteArrayOutputStream();
+    ObjectOutputStream objStream = new ObjectOutputStream(aos);
+    objStream.writeObject(encoderMeta);
+    objStream.close();
+    return aos.toByteArray();
+  }
+
+  private static ValueEncoderMeta createValueEncoderMeta(ValueCompressionModel compressionModel,
+      int index) {
+    ValueEncoderMeta encoderMeta = new ValueEncoderMeta();
+    encoderMeta.setMaxValue(compressionModel.getMaxValue()[index]);
+    encoderMeta.setMinValue(compressionModel.getMinValue()[index]);
+    encoderMeta.setDataTypeSelected(compressionModel.getDataTypeSelected()[index]);
+    encoderMeta.setDecimal(compressionModel.getDecimal()[index]);
+    encoderMeta.setType(compressionModel.getType()[index]);
+    encoderMeta.setUniqueValue(compressionModel.getUniqueValue()[index]);
+    return encoderMeta;
+  }
+
+  /**
+   * Right now it is set to default values. We may use this in future
+   */
+  private static ChunkCompressionMeta getChunkCompressionMeta() {
+    ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
+    chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
+    chunkCompressionMeta.setTotal_compressed_size(0);
+    chunkCompressionMeta.setTotal_uncompressed_size(0);
+    return chunkCompressionMeta;
+  }
+
+  /**
+   * It converts FileFooter thrift object to list of BlockletInfoColumnar objects
+   *
+   * @param footer
+   * @return
+   */
+  public static List<BlockletInfoColumnar> convertBlockletInfo(FileFooter footer)
+      throws IOException {
+    List<BlockletInfoColumnar> listOfNodeInfo =
+        new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    for (BlockletInfo blockletInfo : footer.getBlocklet_info_list()) {
+      BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();
+      blockletInfoColumnar.setNumberOfKeys(blockletInfo.getNum_rows());
+      List<DataChunk> columnChunks = blockletInfo.getColumn_data_chunks();
+      List<DataChunk> dictChunks = new ArrayList<DataChunk>();
+      List<DataChunk> nonDictColChunks = new ArrayList<DataChunk>();
+      for (DataChunk dataChunk : columnChunks) {
+        if (dataChunk.getEncoders().get(0).equals(Encoding.DICTIONARY)) {
+          dictChunks.add(dataChunk);
+        } else {
+          nonDictColChunks.add(dataChunk);
+        }
+      }
+      int[] keyLengths = new int[dictChunks.size()];
+      long[] keyOffSets = new long[dictChunks.size()];
+      long[] keyBlockIndexOffsets = new long[dictChunks.size()];
+      int[] keyBlockIndexLens = new int[dictChunks.size()];
+      long[] indexMapOffsets = new long[dictChunks.size()];
+      int[] indexMapLens = new int[dictChunks.size()];
+      boolean[] sortState = new boolean[dictChunks.size()];
+      int i = 0;
+      for (DataChunk dataChunk : dictChunks) {
+        keyLengths[i] = dataChunk.getData_page_length();
+        keyOffSets[i] = dataChunk.getData_page_offset();
+        keyBlockIndexOffsets[i] = dataChunk.getRowid_page_offset();
+        keyBlockIndexLens[i] = dataChunk.getRowid_page_length();
+        indexMapOffsets[i] = dataChunk.getRle_page_offset();
+        indexMapLens[i] = dataChunk.getRle_page_length();
+        sortState[i] = dataChunk.getSort_state().equals(SortState.SORT_EXPLICIT) ? true : false;
+        i++;
+      }
+      blockletInfoColumnar.setKeyLengths(keyLengths);
+      blockletInfoColumnar.setKeyOffSets(keyOffSets);
+      blockletInfoColumnar.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
+      blockletInfoColumnar.setKeyBlockIndexLength(keyBlockIndexLens);
+      blockletInfoColumnar.setDataIndexMapOffsets(indexMapOffsets);
+      blockletInfoColumnar.setDataIndexMapLength(indexMapLens);
+      blockletInfoColumnar.setIsSortedKeyColumn(sortState);
+
+      int[] msrLens = new int[nonDictColChunks.size()];
+      long[] msrOffsets = new long[nonDictColChunks.size()];
+      ValueEncoderMeta[] encoderMetas = new ValueEncoderMeta[nonDictColChunks.size()];
+      i = 0;
+      for (DataChunk msrChunk : nonDictColChunks) {
+        msrLens[i] = msrChunk.getData_page_length();
+        msrOffsets[i] = msrChunk.getData_page_offset();
+        encoderMetas[i] = deserializeValueEncoderMeta(msrChunk.getEncoder_meta().get(0));
+        i++;
+      }
+      blockletInfoColumnar.setMeasureLength(msrLens);
+      blockletInfoColumnar.setMeasureOffset(msrOffsets);
+      blockletInfoColumnar.setCompressionModel(getValueCompressionModel(encoderMetas));
+      listOfNodeInfo.add(blockletInfoColumnar);
+    }
+
+    setBlockletIndex(footer, listOfNodeInfo);
+    return listOfNodeInfo;
+  }
+
+  private static ValueEncoderMeta deserializeValueEncoderMeta(ByteBuffer byteBuffer)
+      throws IOException {
+    ByteArrayInputStream bis = new ByteArrayInputStream(byteBuffer.array());
+    ObjectInputStream objStream = new ObjectInputStream(bis);
+    ValueEncoderMeta encoderMeta = null;
+    try {
+      encoderMeta = (ValueEncoderMeta) objStream.readObject();
+    } catch (ClassNotFoundException e) {
+      LOGGER.error("Error while reading ValueEncoderMeta");
+    }
+    return encoderMeta;
+
+  }
+
+  private static ValueCompressionModel getValueCompressionModel(ValueEncoderMeta[] encoderMetas) {
+    Object[] maxValue = new Object[encoderMetas.length];
+    Object[] minValue = new Object[encoderMetas.length];
+    int[] decimalLength = new int[encoderMetas.length];
+    Object[] uniqueValue = new Object[encoderMetas.length];
+    char[] aggType = new char[encoderMetas.length];
+    byte[] dataTypeSelected = new byte[encoderMetas.length];
+    for (int i = 0; i < encoderMetas.length; i++) {
+      maxValue[i] = encoderMetas[i].getMaxValue();
+      minValue[i] = encoderMetas[i].getMinValue();
+      decimalLength[i] = encoderMetas[i].getDecimal();
+      uniqueValue[i] = encoderMetas[i].getUniqueValue();
+      aggType[i] = encoderMetas[i].getType();
+      dataTypeSelected[i] = encoderMetas[i].getDataTypeSelected();
+    }
+    return ValueCompressionUtil
+        .getValueCompressionModel(maxValue, minValue, decimalLength, uniqueValue, aggType,
+            dataTypeSelected);
+  }
+
+  private static void setBlockletIndex(FileFooter footer,
+      List<BlockletInfoColumnar> listOfNodeInfo) {
+    List<BlockletIndex> blockletIndexList = footer.getBlocklet_index_list();
+    for (int i = 0; i < blockletIndexList.size(); i++) {
+      BlockletBTreeIndex bTreeIndexList = blockletIndexList.get(i).getB_tree_index();
+      BlockletMinMaxIndex minMaxIndexList = blockletIndexList.get(i).getMin_max_index();
+
+      listOfNodeInfo.get(i).setStartKey(bTreeIndexList.getStart_key());
+      listOfNodeInfo.get(i).setEndKey(bTreeIndexList.getEnd_key());
+      byte[][] min = new byte[minMaxIndexList.getMin_values().size()][];
+      byte[][] max = new byte[minMaxIndexList.getMax_values().size()][];
+      for (int j = 0; j < minMaxIndexList.getMax_valuesSize(); j++) {
+        min[j] = minMaxIndexList.getMin_values().get(j).array();
+        max[j] = minMaxIndexList.getMax_values().get(j).array();
+      }
+      listOfNodeInfo.get(i).setColumnMaxData(max);
+    }
+  }
+
+  /**
+   * Below method will be used to get the index header
+   *
+   * @param columnCardinality cardinality of each column
+   * @param columnSchemaList  list of column present in the table
+   * @return Index header object
+   */
+  public static IndexHeader getIndexHeader(int[] columnCardinality,
+      List<ColumnSchema> columnSchemaList) {
+    // create segment info object
+    SegmentInfo segmentInfo = new SegmentInfo();
+    // set the number of columns
+    segmentInfo.setNum_cols(columnSchemaList.size());
+    // setting the column cardinality
+    segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
+    // create index header object
+    IndexHeader indexHeader = new IndexHeader();
+    // set the segment info
+    indexHeader.setSegment_info(segmentInfo);
+    // set the column names
+    indexHeader.setTable_columns(columnSchemaList);
+    return indexHeader;
+  }
+
+  /**
+   * Below method will be used to get the block index info thrift object for each block
+   * present in the segment
+   *
+   * @param blockIndexInfoList block index info list
+   * @return list of block index
+   */
+  public static List<BlockIndex> getBlockIndexInfo(List<BlockIndexInfo> blockIndexInfoList) {
+    List<BlockIndex> thriftBlockIndexList = new ArrayList<BlockIndex>();
+    BlockIndex blockIndex = null;
+    // below code to create block index info object for each block
+    for (BlockIndexInfo blockIndexInfo : blockIndexInfoList) {
+      blockIndex = new BlockIndex();
+      blockIndex.setNum_rows(blockIndexInfo.getNumberOfRows());
+      blockIndex.setOffset(blockIndexInfo.getNumberOfRows());
+      blockIndex.setFile_name(blockIndexInfo.getFileName());
+      blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex()));
+      thriftBlockIndexList.add(blockIndex);
+    }
+    return thriftBlockIndexList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
new file mode 100644
index 0000000..befd906
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public final class CarbonProperties {
+  /**
+   * Attribute for Carbon LOGGER.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
+  /**
+   * class instance.
+   */
+  private static final CarbonProperties CARBONPROPERTIESINSTANCE = new CarbonProperties();
+
+  /**
+   * porpeties .
+   */
+  private Properties carbonProperties;
+
+  /**
+   * Private constructor this will call load properties method to load all the
+   * carbon properties in memory.
+   */
+  private CarbonProperties() {
+    carbonProperties = new Properties();
+    loadProperties();
+    validateAndLoadDefaultProperties();
+  }
+
+  /**
+   * This method will be responsible for get this class instance
+   *
+   * @return carbon properties instance
+   */
+  public static CarbonProperties getInstance() {
+    return CARBONPROPERTIESINSTANCE;
+  }
+
+  /**
+   * This method validates the loaded properties and loads default
+   * values in case of wrong values.
+   */
+  private void validateAndLoadDefaultProperties() {
+    if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
+      carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
+          CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    }
+
+    validateBlockletSize();
+    validateMaxFileSize();
+    validateNumCores();
+    validateNumCoresBlockSort();
+    validateSortSize();
+    validateBadRecordsLocation();
+    validateHighCardinalityIdentify();
+    validateHighCardinalityThreshold();
+    validateHighCardinalityInRowCountPercentage();
+  }
+
+  private void validateBadRecordsLocation() {
+    String badRecordsLocation =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    if (null == badRecordsLocation || badRecordsLocation.length() == 0) {
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * This method validates the blocklet size
+   */
+  private void validateBlockletSize() {
+    String blockletSizeStr = carbonProperties.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+        CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+    try {
+      int blockletSize = Integer.parseInt(blockletSizeStr);
+
+      if (blockletSize < CarbonCommonConstants.BLOCKLET_SIZE_MIN_VAL
+          || blockletSize > CarbonCommonConstants.BLOCKLET_SIZE_MAX_VAL) {
+        LOGGER.info("The blocklet size value \"" + blockletSizeStr
+                + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+        carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The blocklet size value \"" + blockletSizeStr
+              + "\" is invalid. Using the default value \""
+              + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+      carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+          CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * TODO: This method validates the maximum number of blocklets per file ?
+   */
+  private void validateMaxFileSize() {
+    String maxFileSizeStr = carbonProperties.getProperty(CarbonCommonConstants.MAX_FILE_SIZE,
+        CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL);
+    try {
+      int maxFileSize = Integer.parseInt(maxFileSizeStr);
+
+      if (maxFileSize < CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL_MIN_VAL
+          || maxFileSize > CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL_MAX_VAL) {
+        LOGGER.info("The max file size value \"" + maxFileSizeStr
+                + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL);
+        carbonProperties.setProperty(CarbonCommonConstants.MAX_FILE_SIZE,
+            CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The max file size value \"" + maxFileSizeStr
+              + "\" is invalid. Using the default value \""
+              + CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL);
+
+      carbonProperties.setProperty(CarbonCommonConstants.MAX_FILE_SIZE,
+          CarbonCommonConstants.MAX_FILE_SIZE_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * This method validates the number cores specified
+   */
+  private void validateNumCores() {
+    String numCoresStr = carbonProperties
+        .getProperty(CarbonCommonConstants.NUM_CORES, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    try {
+      int numCores = Integer.parseInt(numCoresStr);
+
+      if (numCores < CarbonCommonConstants.NUM_CORES_MIN_VAL
+          || numCores > CarbonCommonConstants.NUM_CORES_MAX_VAL) {
+        LOGGER.info("The num Cores  value \"" + numCoresStr
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+        carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES,
+            CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The num Cores  value \"" + numCoresStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+      carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES,
+          CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * This method validates the number cores specified for mdk block sort
+   */
+  private void validateNumCoresBlockSort() {
+    String numCoresStr = carbonProperties
+        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+    try {
+      int numCores = Integer.parseInt(numCoresStr);
+
+      if (numCores < CarbonCommonConstants.NUM_CORES_BLOCK_SORT_MIN_VAL
+          || numCores > CarbonCommonConstants.NUM_CORES_BLOCK_SORT_MAX_VAL) {
+        LOGGER.info("The num cores value \"" + numCoresStr
+            + "\" for block sort is invalid. Using the default value \""
+            + CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+        carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The num cores value \"" + numCoresStr
+          + "\" for block sort is invalid. Using the default value \""
+          + CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+      carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+          CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * This method validates the sort size
+   */
+  private void validateSortSize() {
+    String sortSizeStr = carbonProperties
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+    try {
+      int sortSize = Integer.parseInt(sortSizeStr);
+
+      if (sortSize < CarbonCommonConstants.SORT_SIZE_MIN_VAL) {
+        LOGGER.info("The batch size value \"" + sortSizeStr
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+        carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE,
+            CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The batch size value \"" + sortSizeStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+      carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE,
+          CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+    }
+  }
+
+  private void validateHighCardinalityIdentify() {
+    String highcardIdentifyStr = carbonProperties.getProperty(
+        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
+    try {
+      Boolean.parseBoolean(highcardIdentifyStr);
+    } catch (NumberFormatException e) {
+      LOGGER.info("The high cardinality identify value \"" + highcardIdentifyStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+          CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
+    }
+  }
+
+  private void validateHighCardinalityThreshold() {
+    String highcardThresholdStr = carbonProperties.getProperty(
+        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT);
+    try {
+      int highcardThreshold = Integer.parseInt(highcardThresholdStr);
+      if(highcardThreshold < CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN){
+        LOGGER.info("The high cardinality threshold value \"" + highcardThresholdStr
+            + "\" is invalid. Using the min value \""
+            + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN);
+        carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+            CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN + "");
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The high cardinality threshold value \"" + highcardThresholdStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+          CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT);
+    }
+  }
+
+  private void validateHighCardinalityInRowCountPercentage() {
+    String highcardPercentageStr = carbonProperties.getProperty(
+        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+    try {
+      double highcardPercentage = Double.parseDouble(highcardPercentageStr);
+      if(highcardPercentage <= 0){
+        LOGGER.info("The percentage of high cardinality in row count value \""
+            + highcardPercentageStr + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+        carbonProperties.setProperty(
+            CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+            CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The percentage of high cardinality in row count value \""
+          + highcardPercentageStr + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+          CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+    }
+  }
+
+  /**
+   * This method will read all the properties from file and load it into
+   * memory
+   */
+  private void loadProperties() {
+    String property = System.getProperty("carbon.properties.filepath");
+    if (null == property) {
+      property = CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH;
+    }
+    File file = new File(property);
+    LOGGER.info("Property file path: " + file.getAbsolutePath());
+
+    FileInputStream fis = null;
+    try {
+      if (file.exists()) {
+        fis = new FileInputStream(file);
+
+        carbonProperties.load(fis);
+      }
+    } catch (FileNotFoundException e) {
+      LOGGER.error("The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH
+          + " does not exist");
+    } catch (IOException e) {
+      LOGGER.error("Error while reading the file: "
+          + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
+    } finally {
+      if (null != fis) {
+        try {
+          fis.close();
+        } catch (IOException e) {
+          LOGGER.error("Error while closing the file stream for file: "
+                  + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
+        }
+      }
+    }
+
+    print();
+  }
+
+  /**
+   * This method will be used to get the properties value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key) {
+    //TODO temporary fix
+    if ("carbon.leaf.node.size".equals(key)) {
+      return "120000";
+    }
+    return carbonProperties.getProperty(key);
+  }
+
+  /**
+   * This method will be used to get the properties value if property is not
+   * present then it will return tghe default value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key, String defaultValue) {
+    String value = getProperty(key);
+    if (null == value) {
+      return defaultValue;
+    }
+    return value;
+  }
+
+  /**
+   * This method will be used to add a new property
+   *
+   * @param key
+   * @return properties value
+   */
+  public void addProperty(String key, String value) {
+    carbonProperties.setProperty(key, value);
+
+  }
+
+  /**
+   * Validate the restrictions
+   *
+   * @param actual
+   * @param max
+   * @param min
+   * @param defaultVal
+   * @return
+   */
+  public long validate(long actual, long max, long min, long defaultVal) {
+    if (actual <= max && actual >= min) {
+      return actual;
+    }
+    return defaultVal;
+  }
+
+  /**
+   * returns major compaction size value from carbon properties or default value if it is not valid
+   *
+   * @return
+   */
+  public long getMajorCompactionSize() {
+    long compactionSize;
+    try {
+      compactionSize = Long.parseLong(getProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE,
+          CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE));
+    } catch (NumberFormatException e) {
+      compactionSize = Long.parseLong(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE);
+    }
+    return compactionSize;
+  }
+
+  /**
+   * returns the number of loads to be preserved.
+   *
+   * @return
+   */
+  public int getNumberOfSegmentsToBePreserved() {
+    int numberOfSegmentsToBePreserved;
+    try {
+      numberOfSegmentsToBePreserved = Integer.parseInt(
+          getProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+              CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER));
+      // checking min and max . 0  , 100 is min & max.
+      if (numberOfSegmentsToBePreserved < 0 || numberOfSegmentsToBePreserved > 100) {
+        LOGGER.error("The specified value for property "
+            + CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER + " is incorrect."
+            + " Correct value should be in range of 0 -100. Taking the default value.");
+        numberOfSegmentsToBePreserved =
+            Integer.parseInt(CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER);
+      }
+    } catch (NumberFormatException e) {
+      numberOfSegmentsToBePreserved =
+          Integer.parseInt(CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER);
+    }
+    return numberOfSegmentsToBePreserved;
+  }
+
+  public void print() {
+    LOGGER.info("------Using Carbon.properties --------");
+    LOGGER.info(carbonProperties.toString());
+  }
+
+  /**
+   * gettting the unmerged segment numbers to be merged.
+   * @return
+   */
+  public int[] getCompactionSegmentLevelCount() {
+    String commaSeparatedLevels;
+
+    commaSeparatedLevels = getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+    int[] compactionSize = getIntArray(commaSeparatedLevels);
+
+    if(null == compactionSize){
+      compactionSize = getIntArray(CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+    }
+
+    return compactionSize;
+  }
+
+  /**
+   *
+   * @param commaSeparatedLevels
+   * @return
+   */
+  private int[] getIntArray(String commaSeparatedLevels) {
+    String[] levels = commaSeparatedLevels.split(",");
+    int[] compactionSize = new int[levels.length];
+    int i = 0;
+    for (String levelSize : levels) {
+      try {
+        int size = Integer.parseInt(levelSize.trim());
+        if(validate(size,100,0,-1) < 0 ){
+          // if given size is out of boundary then take default value for all levels.
+          return null;
+        }
+        compactionSize[i++] = size;
+      }
+      catch(NumberFormatException e){
+        LOGGER.error(
+            "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
+                + " is not proper. Taking the default value "
+                + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+        return null;
+      }
+    }
+    return compactionSize;
+  }
+
+  /**
+   * Validate the restrictions
+   *
+   * @param actual
+   * @param max
+   * @param min
+   * @param defaultVal
+   * @return
+   */
+  public int validate(int actual, int max, int min, int defaultVal) {
+    if (actual <= max && actual >= min) {
+      return actual;
+    }
+    return defaultVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonTimeStatisticsFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTimeStatisticsFactory.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTimeStatisticsFactory.java
new file mode 100644
index 0000000..c7c2b8a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTimeStatisticsFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class CarbonTimeStatisticsFactory {
+  private static String LoadStatisticsInstanceType;
+  private static LoadStatistics LoadStatisticsInstance;
+
+  static {
+    CarbonTimeStatisticsFactory.updateTimeStatisticsUtilStatus();
+    LoadStatisticsInstance = genLoadStatisticsInstance();
+  }
+
+  private static void updateTimeStatisticsUtilStatus() {
+    LoadStatisticsInstanceType = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS,
+            CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS_DEFAULT);
+  }
+
+  private static LoadStatistics genLoadStatisticsInstance() {
+    switch (LoadStatisticsInstanceType.toLowerCase()) {
+      case "false":
+        return CarbonLoadStatisticsDummy.getInstance();
+      case "true":
+        return CarbonLoadStatisticsImpl.getInstance();
+      default:
+        return CarbonLoadStatisticsDummy.getInstance();
+    }
+  }
+
+  public static LoadStatistics getLoadStatisticsInstance() {
+    return LoadStatisticsInstance;
+  }
+
+}