You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:08:56 UTC

[11/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
deleted file mode 100644
index eb8a0fc..0000000
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ /dev/null
@@ -1,1426 +0,0 @@
-/*
-
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.util;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.cache.dictionary.Dictionary;
-import org.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
-import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.carbondata.core.carbon.metadata.datatype.DataType;
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.carbondata.core.carbon.path.CarbonStorePath;
-import org.carbondata.core.carbon.path.CarbonTablePath;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.columnar.ColumnGroupModel;
-import org.carbondata.core.datastorage.store.columnar.ColumnarKeyStoreDataHolder;
-import org.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
-import org.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
-import org.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.keygenerator.mdkey.NumberCompressor;
-import org.carbondata.core.metadata.ValueEncoderMeta;
-import org.carbondata.scan.model.QueryDimension;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.pentaho.di.core.exception.KettleException;
-
-
-public final class CarbonUtil {
-
-  public static final String HDFS_PREFIX = "hdfs://";
-  public static final String VIEWFS_PREFIX = "viewfs://";
-  private static final String FS_DEFAULT_FS = "fs.defaultFS";
-
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonUtil.class.getName());
-
-  /**
-   * EIGHT
-   */
-  private static final int CONST_EIGHT = 8;
-
-  /**
-   * SEVEN
-   */
-  private static final int CONST_SEVEN = 7;
-
-  /**
-   * HUNDRED
-   */
-  private static final int CONST_HUNDRED = 100;
-
-  private static final Configuration conf = new Configuration(true);
-
-  private CarbonUtil() {
-
-  }
-
-  /**
-   * This method closes the streams
-   *
-   * @param streams - streams to close.
-   */
-  public static void closeStreams(Closeable... streams) {
-    // Added if to avoid NullPointerException in case one stream is being passed as null
-    if (null != streams) {
-      for (Closeable stream : streams) {
-        if (null != stream) {
-          try {
-            stream.close();
-          } catch (IOException e) {
-            LOGGER.error("Error while closing stream" + stream);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * @param baseStorePath
-   * @return
-   */
-  private static int createBaseStoreFolders(String baseStorePath) {
-    FileFactory.FileType fileType = FileFactory.getFileType(baseStorePath);
-    try {
-      if (!FileFactory.isFileExist(baseStorePath, fileType, false)) {
-        if (!FileFactory.mkdirs(baseStorePath, fileType)) {
-          return -1;
-        }
-      }
-    } catch (Exception e) {
-      return -1;
-    }
-    return 1;
-  }
-
-  /**
-   * This method checks whether Restructure Folder exists or not
-   * and if not exist then return the number with which folder need to created.
-   *
-   * @param baseStorePath -
-   *                      baselocation where folder will be created.
-   * @return counter
-   * counter with which folder will be created.
-   */
-  public static int checkAndReturnCurrentRestructFolderNumber(String baseStorePath,
-      final String filterType, final boolean isDirectory) {
-    if (null == baseStorePath || 0 == baseStorePath.length()) {
-      return -1;
-    }
-    // change the slashes to /
-    baseStorePath = baseStorePath.replace("\\", "/");
-
-    // check if string wnds with / then remove that.
-    if (baseStorePath.charAt(baseStorePath.length() - 1) == '/') {
-      baseStorePath = baseStorePath.substring(0, baseStorePath.lastIndexOf("/"));
-    }
-    int retValue = createBaseStoreFolders(baseStorePath);
-    if (-1 == retValue) {
-      return retValue;
-    }
-
-    CarbonFile carbonFile =
-        FileFactory.getCarbonFile(baseStorePath, FileFactory.getFileType(baseStorePath));
-
-    // List of directories
-    CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile pathname) {
-        if (isDirectory && pathname.isDirectory()) {
-          if (pathname.getAbsolutePath().indexOf(filterType) > -1) {
-            return true;
-          }
-        } else {
-          if (pathname.getAbsolutePath().indexOf(filterType) > -1) {
-            return true;
-          }
-        }
-
-        return false;
-      }
-    });
-
-    int counter = -1;
-
-    // if no folder exists then return -1
-    if (listFiles.length == 0) {
-      return counter;
-    }
-
-    counter = findCounterValue(filterType, listFiles, counter);
-    return counter;
-  }
-
-  public static int checkAndReturnCurrentLoadFolderNumber(String baseStorePath) {
-    return checkAndReturnCurrentRestructFolderNumber(baseStorePath, "Load_", true);
-  }
-
-  /**
-   * @param filterType
-   * @param listFiles
-   * @param counter
-   * @return
-   */
-  private static int findCounterValue(final String filterType, CarbonFile[] listFiles,
-      int counter) {
-    if ("Load_".equals(filterType)) {
-      for (CarbonFile files : listFiles) {
-        String folderName = getFolderName(files);
-        if (folderName.indexOf('.') > -1) {
-          folderName = folderName.substring(0, folderName.indexOf('.'));
-        }
-        String[] split = folderName.split("_");
-
-        if (split.length > 1 && counter < Integer.parseInt(split[1])) {
-          counter = Integer.parseInt(split[1]);
-        }
-      }
-    } else {
-      // Iterate list of Directories and find the counter value
-      for (CarbonFile eachFile : listFiles) {
-        String folderName = getFolderName(eachFile);
-        String[] split = folderName.split("_");
-        if (counter < Integer.parseInt(split[1])) {
-          counter = Integer.parseInt(split[1]);
-        }
-      }
-    }
-    return counter;
-  }
-
-  /**
-   * @param eachFile
-   * @return
-   */
-  private static String getFolderName(CarbonFile eachFile) {
-    String str = eachFile.getAbsolutePath();
-    str = str.replace("\\", "/");
-    int firstFolderIndex = str.lastIndexOf("/");
-    String folderName = str.substring(firstFolderIndex);
-    return folderName;
-  }
-
-  /**
-   * This method will be used to update the dimension cardinality
-   *
-   * @param dimCardinality
-   * @return new increment cardinality
-   */
-  public static int[] getIncrementedCardinality(int[] dimCardinality) {
-    // get the cardinality incr factor
-    final int incrValue = CarbonCommonConstants.CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL;
-
-    int perIncr = 0;
-    int remainder = 0;
-    int[] newDimsC = new int[dimCardinality.length];
-    for (int i = 0; i < dimCardinality.length; i++) {
-      // get the incr
-      perIncr = (dimCardinality[i] * incrValue) / CONST_HUNDRED;
-
-      // if per incr is more than one the add to cardinality
-      if (perIncr > 0) {
-        newDimsC[i] = dimCardinality[i] + perIncr;
-      } else {
-        // else add one
-        newDimsC[i] = dimCardinality[i] + 1;
-      }
-      // check whether its in boundary condition
-      remainder = newDimsC[i] % CONST_EIGHT;
-      if (remainder == CONST_SEVEN) {
-        // then incr cardinality by 1
-        newDimsC[i] = dimCardinality[i] + 1;
-      }
-    }
-    // get the log bits of cardinality
-    for (int i = 0; i < newDimsC.length; i++) {
-      newDimsC[i] = Long.toBinaryString(newDimsC[i]).length();
-    }
-    return newDimsC;
-  }
-
-  public static int getIncrementedCardinality(int dimCardinality) {
-    // get the cardinality incr factor
-    final int incrValue = CarbonCommonConstants.CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL;
-
-    int perIncr = 0;
-    int remainder = 0;
-    int newDimsC = 0;
-
-    // get the incr
-    perIncr = (dimCardinality * incrValue) / CONST_HUNDRED;
-
-    // if per incr is more than one the add to cardinality
-    if (perIncr > 0) {
-      newDimsC = dimCardinality + perIncr;
-    } else {
-      // else add one
-      newDimsC = dimCardinality + 1;
-    }
-    // check whether its in boundary condition
-    remainder = newDimsC % CONST_EIGHT;
-    if (remainder == CONST_SEVEN) {
-      // then incr cardinality by 1
-      newDimsC = dimCardinality + 1;
-    }
-    newDimsC = Long.toBinaryString(newDimsC).length();
-    // get the log bits of cardinality
-
-    return newDimsC;
-  }
-
-  /**
-   * return ColumnGroupModel. check ColumnGroupModel for detail
-   *
-   * @param columnGroups : column groups
-   * @return ColumnGroupModel  model
-   */
-  public static ColumnGroupModel getColGroupModel(int[][] columnGroups) {
-    int[] columnSplit = new int[columnGroups.length];
-    int noOfColumnStore = columnSplit.length;
-    boolean[] columnarStore = new boolean[noOfColumnStore];
-
-    for (int i = 0; i < columnGroups.length; i++) {
-      columnSplit[i] = columnGroups[i].length;
-      columnarStore[i] = columnGroups[i].length > 1 ? false : true;
-    }
-    ColumnGroupModel colGroupModel = new ColumnGroupModel();
-    colGroupModel.setNoOfColumnStore(noOfColumnStore);
-    colGroupModel.setColumnSplit(columnSplit);
-    colGroupModel.setColumnarStore(columnarStore);
-    colGroupModel.setColumnGroup(columnGroups);
-    return colGroupModel;
-  }
-
-  /**
-   * This method will be used to update the dimension cardinality
-   *
-   * @param dimCardinality
-   * @return new increment cardinality
-   */
-  public static int[] getIncrementedCardinalityFullyFilled(int[] dimCardinality) {
-    int[] newDimsC = new int[dimCardinality.length];
-    // get the log bits of cardinality
-    for (int i = 0; i < dimCardinality.length; i++) {
-      if (dimCardinality[i] == 0) {
-        //Array or struct type may have higher value
-        newDimsC[i] = 64;
-      } else {
-        int bitsLength = Long.toBinaryString(dimCardinality[i]).length();
-        int div = bitsLength / 8;
-        int mod = bitsLength % 8;
-        if (mod > 0) {
-          newDimsC[i] = 8 * (div + 1);
-        } else {
-          newDimsC[i] = bitsLength;
-        }
-      }
-    }
-    return newDimsC;
-  }
-
-  private static int getBitLengthFullyFilled(int dimlens) {
-    int bitsLength = Long.toBinaryString(dimlens).length();
-    int div = bitsLength / 8;
-    int mod = bitsLength % 8;
-    if (mod > 0) {
-      return 8 * (div + 1);
-    } else {
-      return bitsLength;
-    }
-  }
-
-  /**
-   * This method will be used to delete the folder and files
-   *
-   * @param path file path array
-   * @throws Exception exception
-   */
-  public static void deleteFoldersAndFiles(final File... path) throws CarbonUtilException {
-    try {
-      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
-
-        @Override public Void run() throws Exception {
-          for (int i = 0; i < path.length; i++) {
-            deleteRecursive(path[i]);
-          }
-          return null;
-        }
-      });
-    } catch (IOException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    } catch (InterruptedException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    }
-
-  }
-
-  /**
-   * Recursively delete the files
-   *
-   * @param f File to be deleted
-   * @throws CarbonUtilException
-   */
-  private static void deleteRecursive(File f) throws CarbonUtilException {
-    if (f.isDirectory()) {
-      if (f.listFiles() != null) {
-        for (File c : f.listFiles()) {
-          deleteRecursive(c);
-        }
-      }
-    }
-    if (f.exists() && !f.delete()) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    }
-  }
-
-  public static void deleteFoldersAndFiles(final CarbonFile... file) throws CarbonUtilException {
-    try {
-      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
-
-        @Override public Void run() throws Exception {
-          for (int i = 0; i < file.length; i++) {
-            deleteRecursive(file[i]);
-          }
-          return null;
-        }
-      });
-    } catch (IOException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    } catch (InterruptedException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    }
-  }
-
-  public static String getBadLogPath(String storeLocation) {
-    String badLogStoreLocation =
-            CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    return badLogStoreLocation;
-  }
-
-  public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
-      throws CarbonUtilException {
-    try {
-      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
-
-        @Override public Void run() throws Exception {
-          for (int i = 0; i < file.length; i++) {
-            deleteRecursiveSilent(file[i]);
-          }
-          return null;
-        }
-      });
-    } catch (IOException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    } catch (InterruptedException e) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    }
-  }
-
-  /**
-   * This function will rename the table to be deleted
-   *
-   * @param partitionCount
-   * @param storePath
-   * @param databaseName
-   * @param tableName
-   */
-  public static void renameTableForDeletion(int partitionCount, String storePath,
-      String databaseName, String tableName) {
-    String tableNameWithPartition = "";
-    String databaseNameWithPartition = "";
-    String fullPath = "";
-    String newFilePath = "";
-    String newFileName = "";
-    Callable<Void> c = null;
-    long time = System.currentTimeMillis();
-    FileFactory.FileType fileType = null;
-    ExecutorService executorService = Executors.newFixedThreadPool(10);
-    for (int i = 0; i < partitionCount; i++) {
-      databaseNameWithPartition = databaseName + '_' + i;
-      tableNameWithPartition = tableName + '_' + i;
-      newFileName = tableNameWithPartition + '_' + time;
-      fullPath = storePath + File.separator + databaseNameWithPartition + File.separator
-          + tableNameWithPartition;
-      newFilePath =
-          storePath + File.separator + databaseNameWithPartition + File.separator + newFileName;
-      fileType = FileFactory.getFileType(fullPath);
-      try {
-        if (FileFactory.isFileExist(fullPath, fileType)) {
-          CarbonFile file = FileFactory.getCarbonFile(fullPath, fileType);
-          boolean isRenameSuccessfull = file.renameTo(newFilePath);
-          if (!isRenameSuccessfull) {
-            LOGGER.error("Problem renaming the table :: " + fullPath);
-            c = new DeleteFolderAndFiles(file);
-            executorService.submit(c);
-          } else {
-            c = new DeleteFolderAndFiles(FileFactory.getCarbonFile(newFilePath, fileType));
-            executorService.submit(c);
-          }
-        }
-      } catch (IOException e) {
-        LOGGER.error("Problem renaming the table :: " + fullPath);
-      }
-    }
-    executorService.shutdown();
-  }
-
-  /**
-   * Recursively delete the files
-   *
-   * @param f File to be deleted
-   * @throws CarbonUtilException
-   */
-  private static void deleteRecursive(CarbonFile f) throws CarbonUtilException {
-    if (f.isDirectory()) {
-      if (f.listFiles() != null) {
-        for (CarbonFile c : f.listFiles()) {
-          deleteRecursive(c);
-        }
-      }
-    }
-    if (f.exists() && !f.delete()) {
-      throw new CarbonUtilException("Error while deleting the folders and files");
-    }
-  }
-
-  private static void deleteRecursiveSilent(CarbonFile f) throws CarbonUtilException {
-    if (f.isDirectory()) {
-      if (f.listFiles() != null) {
-        for (CarbonFile c : f.listFiles()) {
-          deleteRecursiveSilent(c);
-        }
-      }
-    }
-    if (f.exists() && !f.delete()) {
-      return;
-    }
-  }
-
-  public static void deleteFiles(File[] intermediateFiles) throws CarbonUtilException {
-    for (int i = 0; i < intermediateFiles.length; i++) {
-      if (!intermediateFiles[i].delete()) {
-        throw new CarbonUtilException("Problem while deleting intermediate file");
-      }
-    }
-  }
-
-  public static byte[] getKeyArray(ColumnarKeyStoreDataHolder[] columnarKeyStoreDataHolder,
-      int totalKeySize, int eachKeySize) {
-    byte[] completeKeyArray = new byte[totalKeySize];
-    byte[] keyBlockData = null;
-    int destinationPosition = 0;
-    int[] columnIndex = null;
-    int blockKeySize = 0;
-    for (int i = 0; i < columnarKeyStoreDataHolder.length; i++) {
-      keyBlockData = columnarKeyStoreDataHolder[i].getKeyBlockData();
-      blockKeySize = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getEachRowSize();
-      if (columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().isSorted()) {
-        for (int j = 0; j < keyBlockData.length; j += blockKeySize) {
-          System.arraycopy(keyBlockData, j, completeKeyArray, destinationPosition, blockKeySize);
-          destinationPosition += eachKeySize;
-        }
-      } else {
-        columnIndex = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getColumnIndex();
-
-        for (int j = 0; j < columnIndex.length; j++) {
-          System.arraycopy(keyBlockData, columnIndex[j] * blockKeySize, completeKeyArray,
-              eachKeySize * columnIndex[j] + destinationPosition, blockKeySize);
-        }
-      }
-      destinationPosition = blockKeySize;
-    }
-    return completeKeyArray;
-  }
-
-  public static byte[] getKeyArray(ColumnarKeyStoreDataHolder[] columnarKeyStoreDataHolder,
-      int totalKeySize, int eachKeySize, short[] columnIndex) {
-    byte[] completeKeyArray = new byte[totalKeySize];
-    byte[] keyBlockData = null;
-    int destinationPosition = 0;
-    int blockKeySize = 0;
-    for (int i = 0; i < columnarKeyStoreDataHolder.length; i++) {
-      keyBlockData = columnarKeyStoreDataHolder[i].getKeyBlockData();
-      blockKeySize = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getEachRowSize();
-
-      for (int j = 0; j < columnIndex.length; j++) {
-        System.arraycopy(keyBlockData, columnIndex[j] * blockKeySize, completeKeyArray,
-            destinationPosition, blockKeySize);
-        destinationPosition += eachKeySize;
-      }
-      destinationPosition = blockKeySize;
-    }
-    return completeKeyArray;
-  }
-
-  public static int getFirstIndexUsingBinarySearch(FixedLengthDimensionDataChunk dimColumnDataChunk,
-      int low, int high, byte[] compareValue, boolean matchUpLimit) {
-    int cmpResult = 0;
-    while (high >= low) {
-      int mid = (low + high) / 2;
-      cmpResult = ByteUtil.UnsafeComparer.INSTANCE
-          .compareTo(dimColumnDataChunk.getCompleteDataChunk(), mid * compareValue.length,
-              compareValue.length, compareValue, 0, compareValue.length);
-      if (cmpResult < 0) {
-        low = mid + 1;
-      } else if (cmpResult > 0) {
-        high = mid - 1;
-      } else {
-        int currentIndex = mid;
-        if(!matchUpLimit) {
-          while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-                  (currentIndex - 1) * compareValue.length, compareValue.length, compareValue, 0,
-                  compareValue.length) == 0) {
-            --currentIndex;
-          }
-        } else {
-          while (currentIndex + 1 <= high && ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-                  (currentIndex + 1) * compareValue.length, compareValue.length, compareValue, 0,
-                  compareValue.length) == 0) {
-            currentIndex++;
-          }
-        }
-        return currentIndex;
-      }
-    }
-    return -(low + 1);
-  }
-
-  /**
-   * Method will identify the value which is lesser than the pivot element
-   * on which range filter is been applied.
-   *
-   * @param currentIndex
-   * @param dimColumnDataChunk
-   * @param compareValue
-   * @return index value
-   */
-  public static int nextLesserValueToTarget(int currentIndex,
-      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue) {
-    while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
-        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-            (currentIndex - 1) * compareValue.length, compareValue.length, compareValue, 0,
-            compareValue.length) >= 0) {
-      --currentIndex;
-    }
-
-    return --currentIndex;
-  }
-
-  /**
-   * Method will identify the value which is greater than the pivot element
-   * on which range filter is been applied.
-   *
-   * @param currentIndex
-   * @param dimColumnDataChunk
-   * @param compareValue
-   * @param numerOfRows
-   * @return index value
-   */
-  public static int nextGreaterValueToTarget(int currentIndex,
-      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue, int numerOfRows) {
-    while (currentIndex + 1 < numerOfRows && ByteUtil.UnsafeComparer.INSTANCE
-        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
-            (currentIndex + 1) * compareValue.length, compareValue.length, compareValue, 0,
-            compareValue.length) <= 0) {
-      ++currentIndex;
-    }
-
-    return ++currentIndex;
-  }
-
-  public static int[] getUnCompressColumnIndex(int totalLength, byte[] columnIndexData,
-      NumberCompressor numberCompressor) {
-    ByteBuffer buffer = ByteBuffer.wrap(columnIndexData);
-    buffer.rewind();
-    int indexDataLength = buffer.getInt();
-    byte[] indexData = new byte[indexDataLength];
-    byte[] indexMap =
-        new byte[totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE];
-    buffer.get(indexData);
-    buffer.get(indexMap);
-    return UnBlockIndexer.uncompressIndex(numberCompressor.unCompress(indexData),
-        numberCompressor.unCompress(indexMap));
-  }
-
-  /**
-   * Convert int array to Integer list
-   *
-   * @param array
-   * @return List<Integer>
-   */
-  public static List<Integer> convertToIntegerList(int[] array) {
-    List<Integer> integers = new ArrayList<Integer>();
-    for (int i = 0; i < array.length; i++) {
-      integers.add(array[i]);
-    }
-    return integers;
-  }
-
-  /**
-   * Read level metadata file and return cardinality
-   *
-   * @param levelPath
-   * @return
-   * @throws CarbonUtilException
-   */
-  public static int[] getCardinalityFromLevelMetadataFile(String levelPath)
-      throws CarbonUtilException {
-    DataInputStream dataInputStream = null;
-    int[] cardinality = null;
-
-    try {
-      if (FileFactory.isFileExist(levelPath, FileFactory.getFileType(levelPath))) {
-        dataInputStream =
-            FileFactory.getDataInputStream(levelPath, FileFactory.getFileType(levelPath));
-
-        cardinality = new int[dataInputStream.readInt()];
-
-        for (int i = 0; i < cardinality.length; i++) {
-          cardinality[i] = dataInputStream.readInt();
-        }
-      }
-    } catch (FileNotFoundException e) {
-      throw new CarbonUtilException("Problem while getting the file", e);
-    } catch (IOException e) {
-      throw new CarbonUtilException("Problem while reading the file", e);
-    } finally {
-      closeStreams(dataInputStream);
-    }
-
-    return cardinality;
-  }
-
-  public static void writeLevelCardinalityFile(String loadFolderLoc, String tableName,
-      int[] dimCardinality) throws KettleException {
-    String levelCardinalityFilePath = loadFolderLoc + File.separator +
-        CarbonCommonConstants.LEVEL_METADATA_FILE + tableName
-        + CarbonCommonConstants.CARBON_METADATA_EXTENSION;
-    FileOutputStream fileOutputStream = null;
-    FileChannel channel = null;
-    try {
-      int dimCardinalityArrLength = dimCardinality.length;
-
-      // first four bytes for writing the length of array, remaining for array data
-      ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE
-          + dimCardinalityArrLength * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-
-      fileOutputStream = new FileOutputStream(levelCardinalityFilePath);
-      channel = fileOutputStream.getChannel();
-      buffer.putInt(dimCardinalityArrLength);
-
-      for (int i = 0; i < dimCardinalityArrLength; i++) {
-        buffer.putInt(dimCardinality[i]);
-      }
-
-      buffer.flip();
-      channel.write(buffer);
-      buffer.clear();
-
-      LOGGER.info("Level cardinality file written to : " + levelCardinalityFilePath);
-    } catch (IOException e) {
-      LOGGER.error("Error while writing level cardinality file : " + levelCardinalityFilePath + e
-          .getMessage());
-      throw new KettleException("Not able to write level cardinality file", e);
-    } finally {
-      closeStreams(channel, fileOutputStream);
-    }
-  }
-
-  /**
-   * From beeline if a delimeter is passed as \001, in code we get it as
-   * escaped string as \\001. So this method will unescape the slash again and
-   * convert it back t0 \001
-   *
-   * @param parseStr
-   * @return
-   */
-  public static String unescapeChar(String parseStr) {
-    switch (parseStr) {
-      case "\\001":
-        return "\001";
-      case "\\t":
-        return "\t";
-      case "\\r":
-        return "\r";
-      case "\\b":
-        return "\b";
-      case "\\f":
-        return "\f";
-      case "\\n":
-        return "\n";
-      default:
-        return parseStr;
-    }
-  }
-
-  public static String escapeComplexDelimiterChar(String parseStr) {
-    switch (parseStr) {
-      case "$":
-        return "\\$";
-      case ":":
-        return "\\:";
-      default:
-        return parseStr;
-    }
-  }
-
-  /**
-   * Append HDFS Base Url for show create & load data sql
-   *
-   * @param filePath
-   */
-  public static String checkAndAppendHDFSUrl(String filePath) {
-    String currentPath = filePath;
-    if (null != filePath && filePath.length() != 0 &&
-        FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS &&
-        FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
-      String baseDFSUrl = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL);
-      if (null != baseDFSUrl) {
-        String dfsUrl = conf.get(FS_DEFAULT_FS);
-        if (null != dfsUrl && (dfsUrl.startsWith(HDFS_PREFIX) || dfsUrl
-            .startsWith(VIEWFS_PREFIX))) {
-          baseDFSUrl = dfsUrl + baseDFSUrl;
-        }
-        if (baseDFSUrl.endsWith("/")) {
-          baseDFSUrl = baseDFSUrl.substring(0, baseDFSUrl.length() - 1);
-        }
-        if (!filePath.startsWith("/")) {
-          filePath = "/" + filePath;
-        }
-        currentPath = baseDFSUrl + filePath;
-      }
-    }
-    return currentPath;
-  }
-
-  /**
-   * @param location
-   * @param factTableName
-   * @return
-   */
-  public static int getRestructureNumber(String location, String factTableName) {
-    String restructName =
-        location.substring(location.indexOf(CarbonCommonConstants.RESTRUCTRE_FOLDER));
-    int factTableIndex = restructName.indexOf(factTableName) - 1;
-    String restructNumber =
-        restructName.substring(CarbonCommonConstants.RESTRUCTRE_FOLDER.length(), factTableIndex);
-    return Integer.parseInt(restructNumber);
-  }
-
-  /**
-   * Below method will be used to get the aggregator type
-   * CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE will return when value is double measure
-   * CarbonCommonConstants.BYTE_VALUE_MEASURE will be returned when value is byte array
-   *
-   * @param agg
-   * @return aggregator type
-   */
-  public static char getType(String agg) {
-    if (CarbonCommonConstants.SUM.equals(agg) || CarbonCommonConstants.COUNT.equals(agg)) {
-      return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
-    } else {
-      return CarbonCommonConstants.BYTE_VALUE_MEASURE;
-    }
-  }
-
-  public static String getCarbonStorePath(String databaseName, String tableName) {
-    CarbonProperties prop = CarbonProperties.getInstance();
-    if (null == prop) {
-      return null;
-    }
-    String basePath = prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
-        CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    return basePath;
-  }
-
-  /**
-   * This method will check the existence of a file at a given path
-   */
-  public static boolean isFileExists(String fileName) {
-    try {
-      FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-      if (FileFactory.isFileExist(fileName, fileType)) {
-        return true;
-      }
-    } catch (IOException e) {
-      LOGGER.error("@@@@@@  File not found at a given location @@@@@@ : " + fileName);
-    }
-    return false;
-  }
-
-  /**
-   * This method will check and create the given path
-   */
-  public static boolean checkAndCreateFolder(String path) {
-    boolean created = false;
-    try {
-      FileFactory.FileType fileType = FileFactory.getFileType(path);
-      if (FileFactory.isFileExist(path, fileType)) {
-        created = true;
-      } else {
-        created = FileFactory.mkdirs(path, fileType);
-      }
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage());
-    }
-    return created;
-  }
-
-  /**
-   * This method will return the size of a given file
-   */
-  public static long getFileSize(String filePath) {
-    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
-    CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
-    return carbonFile.getSize();
-  }
-
-  /**
-   * This method will be used to get bit length of the dimensions based on the
-   * dimension partitioner. If partitioner is value is 1 the column
-   * cardinality will be incremented in such a way it will fit in byte level.
-   * for example if number of bits required to store one column value is 3
-   * bits the 8 bit will be assigned to each value of that column.In this way
-   * we may waste some bits(maximum 7 bits) If partitioner value is more than
-   * 1 then few column are stored together. so cardinality of that group will
-   * be incremented to fit in byte level For example: if cardinality for 3
-   * columns stored together is [1,1,1] then number of bits required will be
-   * [1,1,1] then last value will be incremented and it will become[1,1,6]
-   *
-   * @param dimCardinality cardinality of each column
-   * @param dimPartitioner Partitioner is how column is stored if value is 1 then column
-   *                       wise if value is more than 1 then it is in group with other
-   *                       column
-   * @return number of bits for each column
-   * @TODO for row group only last value is incremented problem in this cases
-   * in if last column in that group is selected most of the time in
-   * filter query Comparison will be more if it incremented uniformly
-   * then comparison will be distributed
-   */
-  public static int[] getDimensionBitLength(int[] dimCardinality, int[] dimPartitioner) {
-    int[] bitLength = new int[dimCardinality.length];
-    int dimCounter = 0;
-    for (int i = 0; i < dimPartitioner.length; i++) {
-      if (dimPartitioner[i] == 1) {
-        // for columnar store
-        // fully filled bits means complete byte or number of bits
-        // assigned will be in
-        // multiplication of 8
-        bitLength[dimCounter] = getBitLengthFullyFilled(dimCardinality[dimCounter]);
-        dimCounter++;
-      } else {
-        // for row store
-        int totalSize = 0;
-        for (int j = 0; j < dimPartitioner[i]; j++) {
-          bitLength[dimCounter] = getIncrementedCardinality(dimCardinality[dimCounter]);
-          totalSize += bitLength[dimCounter];
-          dimCounter++;
-        }
-        // below code is to increment in such a way that row group will
-        // be stored
-        // as byte level
-        int mod = totalSize % 8;
-        if (mod > 0) {
-          bitLength[dimCounter - 1] = bitLength[dimCounter - 1] + (8 - mod);
-        }
-      }
-    }
-    return bitLength;
-  }
-
-  /**
-   * Below method will be used to get the value compression model of the
-   * measure data chunk
-   *
-   * @param measureDataChunkList
-   * @return value compression model
-   */
-  public static ValueCompressionModel getValueCompressionModel(
-      List<DataChunk> measureDataChunkList) {
-    Object[] maxValue = new Object[measureDataChunkList.size()];
-    Object[] minValue = new Object[measureDataChunkList.size()];
-    Object[] uniqueValue = new Object[measureDataChunkList.size()];
-    int[] decimal = new int[measureDataChunkList.size()];
-    char[] type = new char[measureDataChunkList.size()];
-    byte[] dataTypeSelected = new byte[measureDataChunkList.size()];
-
-    /**
-     * to fill the meta data required for value compression model
-     */
-    for (int i = 0; i < dataTypeSelected.length; i++) {
-      int indexOf = measureDataChunkList.get(i).getEncodingList().indexOf(Encoding.DELTA);
-      if (indexOf > -1) {
-        ValueEncoderMeta valueEncoderMeta =
-            measureDataChunkList.get(i).getValueEncoderMeta().get(indexOf);
-        maxValue[i] = valueEncoderMeta.getMaxValue();
-        minValue[i] = valueEncoderMeta.getMinValue();
-        uniqueValue[i] = valueEncoderMeta.getUniqueValue();
-        decimal[i] = valueEncoderMeta.getDecimal();
-        type[i] = valueEncoderMeta.getType();
-        dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected();
-      }
-    }
-    MeasureMetaDataModel measureMetadataModel =
-        new MeasureMetaDataModel(minValue, maxValue, decimal, dataTypeSelected.length, uniqueValue,
-            type, dataTypeSelected);
-    return ValueCompressionUtil.getValueCompressionModel(measureMetadataModel);
-  }
-
-  /**
-   * Below method will be used to check whether particular encoding is present
-   * in the dimension or not
-   *
-   * @param encoding  encoding to search
-   * @return if encoding is present in dimension
-   */
-  public static boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
-    return encodings.contains(encoding);
-  }
-
-  /**
-   * below method is to check whether data type is present in the data type array
-   *
-   * @param dataType  data type to be searched
-   * @param dataTypes all data types
-   * @return if data type is present
-   */
-  public static boolean hasDataType(DataType dataType, DataType[] dataTypes) {
-    for (int i = 0; i < dataTypes.length; i++) {
-      if (dataType.equals(dataTypes[i])) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * below method is to check whether it is complex data type
-   *
-   * @param dataType  data type to be searched
-   * @return if data type is present
-   */
-  public static boolean hasComplexDataType(DataType dataType) {
-    switch (dataType) {
-      case ARRAY :
-      case STRUCT:
-      case MAP:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  public static boolean[] getDictionaryEncodingArray(QueryDimension[] queryDimensions) {
-    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
-    for (int i = 0; i < queryDimensions.length; i++) {
-      dictionaryEncodingArray[i] =
-          queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY);
-    }
-    return dictionaryEncodingArray;
-  }
-
-  public static boolean[] getDirectDictionaryEncodingArray(QueryDimension[] queryDimensions) {
-    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
-    for (int i = 0; i < queryDimensions.length; i++) {
-      dictionaryEncodingArray[i] =
-          queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY);
-    }
-    return dictionaryEncodingArray;
-  }
-
-  public static boolean[] getComplexDataTypeArray(QueryDimension[] queryDimensions) {
-    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
-    for (int i = 0; i < queryDimensions.length; i++) {
-      dictionaryEncodingArray[i] =
-          CarbonUtil.hasComplexDataType(queryDimensions[i].getDimension().getDataType());
-    }
-    return dictionaryEncodingArray;
-  }
-
-  /**
-   * Below method will be used to read the data file matadata
-   *
-   * @param filePath file path
-   * @param blockOffset   offset in the file
-   * @return Data file metadata instance
-   * @throws CarbonUtilException
-   */
-  public static DataFileFooter readMetadatFile(String filePath, long blockOffset, long blockLength)
-      throws CarbonUtilException {
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    try {
-      return fileFooterConverter.readDataFileFooter(filePath, blockOffset, blockLength);
-    } catch (IOException e) {
-      throw new CarbonUtilException("Problem while reading the file metadata", e);
-    }
-  }
-
-  /**
-   * Below method will be used to get the surrogate key
-   *
-   * @param data   actual data
-   * @param buffer byte buffer which will be used to convert the data to integer value
-   * @return surrogate key
-   */
-  public static int getSurrogateKey(byte[] data, ByteBuffer buffer) {
-    int lenght = 4 - data.length;
-    for (int i = 0; i < lenght; i++) {
-      buffer.put((byte) 0);
-    }
-    buffer.put(data);
-    buffer.rewind();
-    int surrogate = buffer.getInt();
-    buffer.clear();
-    return surrogate;
-  }
-
-  /**
-   * Thread to delete the tables
-   *
-   */
-  private static final class DeleteFolderAndFiles implements Callable<Void> {
-    private CarbonFile file;
-
-    private DeleteFolderAndFiles(CarbonFile file) {
-      this.file = file;
-    }
-
-    @Override public Void call() throws Exception {
-      deleteFoldersAndFiles(file);
-      return null;
-    }
-
-  }
-
-  /**
-   * class to sort aggregate folder list in descending order
-   */
-  private static class AggTableComparator implements Comparator<String> {
-    public int compare(String aggTable1, String aggTable2) {
-      int index1 = aggTable1.lastIndexOf(CarbonCommonConstants.UNDERSCORE);
-      int index2 = aggTable2.lastIndexOf(CarbonCommonConstants.UNDERSCORE);
-      int n1 = Integer.parseInt(aggTable1.substring(index1 + 1));
-      int n2 = Integer.parseInt(aggTable2.substring(index2 + 1));
-      if (n1 > n2) {
-        return -1;
-      } else if (n1 < n2) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-  }
-
-  /**
-   * Below method will be used to get the dimension
-   *
-   * @param tableDimensionList table dimension list
-   * @return boolean array specifying true if dimension is dictionary
-   * and false if dimension is not a dictionary column
-   */
-  public static boolean[] identifyDimensionType(List<CarbonDimension> tableDimensionList) {
-    List<Boolean> isDictionaryDimensions = new ArrayList<Boolean>();
-    Set<Integer> processedColumnGroup = new HashSet<Integer>();
-    for (CarbonDimension carbonDimension : tableDimensionList) {
-      List<CarbonDimension> childs = carbonDimension.getListOfChildDimensions();
-      //assuming complex dimensions will always be atlast
-      if(null != childs && childs.size() > 0) {
-        break;
-      }
-      if (carbonDimension.isColumnar() && hasEncoding(carbonDimension.getEncoder(),
-          Encoding.DICTIONARY)) {
-        isDictionaryDimensions.add(true);
-      } else if (!carbonDimension.isColumnar()) {
-        if (processedColumnGroup.add(carbonDimension.columnGroupId())) {
-          isDictionaryDimensions.add(true);
-        }
-      } else {
-        isDictionaryDimensions.add(false);
-      }
-    }
-    boolean[] primitive = ArrayUtils
-        .toPrimitive(isDictionaryDimensions.toArray(new Boolean[isDictionaryDimensions.size()]));
-    return primitive;
-  }
-
-  /**
-   * This method will form one single byte [] for all the high card dims.
-   * First it will add all the indexes of variable length byte[] and then the
-   * actual value
-   *
-   * @param byteBufferArr
-   * @return byte[] key.
-   */
-  public static byte[] packByteBufferIntoSingleByteArray(ByteBuffer[] byteBufferArr) {
-    // for empty array means there is no data to remove dictionary.
-    if (null == byteBufferArr || byteBufferArr.length == 0) {
-      return null;
-    }
-    int noOfCol = byteBufferArr.length;
-    short offsetLen = (short) (noOfCol * 2);
-    int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen;
-    ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
-    // writing the offset of the first element.
-    buffer.putShort(offsetLen);
-
-    // prepare index for byte []
-    for (int index = 0; index < byteBufferArr.length - 1; index++) {
-      ByteBuffer individualCol = byteBufferArr[index];
-      int noOfBytes = individualCol.capacity();
-      buffer.putShort((short) (offsetLen + noOfBytes));
-      offsetLen += noOfBytes;
-      individualCol.rewind();
-    }
-
-    // put actual data.
-    for (int index = 0; index < byteBufferArr.length; index++) {
-      ByteBuffer individualCol = byteBufferArr[index];
-      buffer.put(individualCol.array());
-    }
-
-    buffer.rewind();
-    return buffer.array();
-
-  }
-
-  /**
-   * To calculate the total bytes in byte Buffer[].
-   *
-   * @param byteBufferArr
-   * @return
-   */
-  private static int calculateTotalBytes(ByteBuffer[] byteBufferArr) {
-    int total = 0;
-    for (int index = 0; index < byteBufferArr.length; index++) {
-      total += byteBufferArr[index].capacity();
-    }
-    return total;
-  }
-
-  /**
-   * Find the dimension from metadata by using unique name. As of now we are
-   * taking level name as unique name. But user needs to give one unique name
-   * for each level,that level he needs to mention in query.
-   *
-   * @param dimensions
-   * @param carbonDim
-   * @return
-   */
-  public static CarbonDimension findDimension(List<CarbonDimension> dimensions, String carbonDim) {
-    CarbonDimension findDim = null;
-    for (CarbonDimension dimension : dimensions) {
-      if (dimension.getColName().equalsIgnoreCase(carbonDim)) {
-        findDim = dimension;
-        break;
-      }
-    }
-    return findDim;
-  }
-
-  /**
-   * This method will be used to clear the dictionary cache after its usage is complete
-   * so that if memory threshold is reached it can evicted from LRU cache
-   *
-   * @param dictionary
-   */
-  public static void clearDictionaryCache(Dictionary dictionary) {
-    if (null != dictionary) {
-      dictionary.clear();
-    }
-  }
-
-  /**
-   * convert from wrapper to external data type
-   *
-   * @param dataType
-   * @return
-   */
-  public static org.carbondata.format.DataType fromWrapperToExternalDataType(DataType dataType) {
-
-    if (null == dataType) {
-      return null;
-    }
-    switch (dataType) {
-      case STRING:
-        return org.carbondata.format.DataType.STRING;
-      case INT:
-        return org.carbondata.format.DataType.INT;
-      case LONG:
-        return org.carbondata.format.DataType.LONG;
-      case DOUBLE:
-        return org.carbondata.format.DataType.DOUBLE;
-      case DECIMAL:
-        return org.carbondata.format.DataType.DECIMAL;
-      case TIMESTAMP:
-        return org.carbondata.format.DataType.TIMESTAMP;
-      case ARRAY:
-        return org.carbondata.format.DataType.ARRAY;
-      case STRUCT:
-        return org.carbondata.format.DataType.STRUCT;
-      default:
-        return org.carbondata.format.DataType.STRING;
-    }
-  }
-
-  /**
-   * convert from external to wrapper data type
-   *
-   * @param dataType
-   * @return
-   */
-  public static DataType fromExternalToWrapperDataType(org.carbondata.format.DataType dataType) {
-    if (null == dataType) {
-      return null;
-    }
-    switch (dataType) {
-      case STRING:
-        return DataType.STRING;
-      case INT:
-        return DataType.INT;
-      case LONG:
-        return DataType.LONG;
-      case DOUBLE:
-        return DataType.DOUBLE;
-      case DECIMAL:
-        return DataType.DECIMAL;
-      case TIMESTAMP:
-        return DataType.TIMESTAMP;
-      case ARRAY:
-        return DataType.ARRAY;
-      case STRUCT:
-        return DataType.STRUCT;
-      default:
-        return DataType.STRING;
-    }
-  }
-  /**
-   * @param dictionaryColumnCardinality
-   * @param wrapperColumnSchemaList
-   * @return It returns formatted cardinality by adding -1 value for NoDictionary columns
-   */
-  public static int[] getFormattedCardinality(int[] dictionaryColumnCardinality,
-      List<ColumnSchema> wrapperColumnSchemaList) {
-    List<Integer> cardinality = new ArrayList<>();
-    int counter = 0;
-    for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
-      if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
-          org.carbondata.core.carbon.metadata.encoder.Encoding.DICTIONARY)) {
-        cardinality.add(dictionaryColumnCardinality[counter]);
-        counter++;
-      } else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
-        continue;
-      } else {
-        cardinality.add(-1);
-      }
-    }
-    return ArrayUtils.toPrimitive(cardinality.toArray(new Integer[cardinality.size()]));
-  }
-
-  public static List<ColumnSchema> getColumnSchemaList(List<CarbonDimension> carbonDimensionsList,
-      List<CarbonMeasure> carbonMeasureList) {
-    List<ColumnSchema> wrapperColumnSchemaList = new ArrayList<ColumnSchema>();
-    fillCollumnSchemaListForComplexDims(carbonDimensionsList, wrapperColumnSchemaList);
-    for (CarbonMeasure carbonMeasure : carbonMeasureList) {
-      wrapperColumnSchemaList.add(carbonMeasure.getColumnSchema());
-    }
-    return wrapperColumnSchemaList;
-  }
-
-  private static void fillCollumnSchemaListForComplexDims(
-      List<CarbonDimension> carbonDimensionsList, List<ColumnSchema> wrapperColumnSchemaList) {
-    for (CarbonDimension carbonDimension : carbonDimensionsList) {
-      wrapperColumnSchemaList.add(carbonDimension.getColumnSchema());
-      List<CarbonDimension> childDims = carbonDimension.getListOfChildDimensions();
-      if (null != childDims && childDims.size() > 0) {
-        fillCollumnSchemaListForComplexDims(childDims, wrapperColumnSchemaList);
-      }
-    }
-  }
-  /**
-   * Below method will be used to get all the block index info from index file
-   *
-   * @param taskId                  task id of the file
-   * @param tableBlockInfoList      list of table block
-   * @param absoluteTableIdentifier absolute table identifier
-   * @return list of block info
-   * @throws CarbonUtilException if any problem while reading
-   */
-  public static List<DataFileFooter> readCarbonIndexFile(String taskId,
-      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
-      throws CarbonUtilException {
-    // need to sort the  block info list based for task in ascending  order so
-    // it will be sinkup with block index read from file
-    Collections.sort(tableBlockInfoList);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    // geting the index file path
-    //TODO need to pass proper partition number when partiton will be supported
-    String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    try {
-      // read the index info and return
-      return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
-    } catch (IOException e) {
-      throw new CarbonUtilException("Problem while reading the file metadata", e);
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/CarbonUtilException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtilException.java b/core/src/main/java/org/carbondata/core/util/CarbonUtilException.java
deleted file mode 100644
index 9c54a07..0000000
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtilException.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.util;
-
-import java.util.Locale;
-
-public class CarbonUtilException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonUtilException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonUtilException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/carbondata/core/util/DataFileFooterConverter.java
deleted file mode 100644
index cb28386..0000000
--- a/core/src/main/java/org/carbondata/core/util/DataFileFooterConverter.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
-import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
-import org.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta;
-import org.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec;
-import org.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
-import org.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
-import org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
-import org.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.carbondata.core.carbon.metadata.blocklet.sort.SortState;
-import org.carbondata.core.carbon.metadata.datatype.DataType;
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.metadata.ValueEncoderMeta;
-import org.carbondata.core.reader.CarbonFooterReader;
-import org.carbondata.core.reader.CarbonIndexFileReader;
-import org.carbondata.format.BlockIndex;
-import org.carbondata.format.FileFooter;
-
-/**
- * Below class will be used to convert the thrift object of data file
- * meta data to wrapper object
- */
-public class DataFileFooterConverter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataFileFooterConverter.class.getName());
-
-  /**
-   * Below method will be used to get the index info from index file
-   *
-   * @param filePath           file path of the index file
-   * @param tableBlockInfoList table block index
-   * @return list of index info
-   * @throws IOException problem while reading the index file
-   */
-  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
-      throws IOException {
-    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
-    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
-    try {
-      // open the reader
-      indexReader.openThriftReader(filePath);
-      // get the index header
-      org.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.carbondata.format.ColumnSchema> table_columns = readIndexHeader.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
-      }
-      // get the segment info
-      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
-      BlockletIndex blockletIndex = null;
-      int counter = 0;
-      DataFileFooter dataFileFooter = null;
-      // read the block info from file
-      while (indexReader.hasNext()) {
-        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
-        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
-        dataFileFooter = new DataFileFooter();
-        dataFileFooter.setBlockletIndex(blockletIndex);
-        dataFileFooter.setColumnInTable(columnSchemaList);
-        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
-        dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
-        dataFileFooter.setSegmentInfo(segmentInfo);
-        dataFileFooters.add(dataFileFooter);
-      }
-    } finally {
-      indexReader.closeThriftReader();
-    }
-    return dataFileFooters;
-  }
-
-  /**
-   * Below method will be used to convert thrift file meta to wrapper file meta
-   */
-  public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)
-      throws IOException {
-    DataFileFooter dataFileFooter = new DataFileFooter();
-    FileHolder fileReader = null;
-    try {
-      long completeBlockLength = blockOffset + blockLength;
-      long footerPointer = completeBlockLength - 8;
-      fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath));
-      long actualFooterOffset = fileReader.readLong(filePath, footerPointer);
-      CarbonFooterReader reader = new CarbonFooterReader(filePath, actualFooterOffset);
-      FileFooter footer = reader.readFooter();
-      dataFileFooter.setVersionId(footer.getVersion());
-      dataFileFooter.setNumberOfRows(footer.getNum_rows());
-      dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
-      }
-      dataFileFooter.setColumnInTable(columnSchemaList);
-
-      List<org.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
-          footer.getBlocklet_index_list();
-      List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
-      for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
-        BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
-        blockletIndexList.add(blockletIndex);
-      }
-
-      List<org.carbondata.format.BlockletInfo> leaf_node_infos_Thrift =
-          footer.getBlocklet_info_list();
-      List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
-      for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
-        BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i));
-        blockletInfo.setBlockletIndex(blockletIndexList.get(i));
-        blockletInfoList.add(blockletInfo);
-      }
-      dataFileFooter.setBlockletList(blockletInfoList);
-      dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
-    } finally {
-      if (null != fileReader) {
-        fileReader.finish();
-      }
-    }
-    return dataFileFooter;
-  }
-
-  /**
-   * Below method will be used to get blocklet index for data file meta
-   *
-   * @param blockletIndexList
-   * @return blocklet index
-   */
-  private BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) {
-    BlockletIndex blockletIndex = new BlockletIndex();
-    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
-    blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey());
-    blockletBTreeIndex
-        .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey());
-    blockletIndex.setBtreeIndex(blockletBTreeIndex);
-    byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone();
-    byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone();
-    byte[][] minValue = null;
-    byte[][] maxValue = null;
-    for (int i = 1; i < blockletIndexList.size(); i++) {
-      minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues();
-      maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues();
-      for (int j = 0; j < maxValue.length; j++) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
-          currentMinValue[j] = minValue[j].clone();
-        }
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
-          currentMaxValue[j] = maxValue[j].clone();
-        }
-      }
-    }
-
-    BlockletMinMaxIndex minMax = new BlockletMinMaxIndex();
-    minMax.setMaxValues(currentMaxValue);
-    minMax.setMinValues(currentMinValue);
-    blockletIndex.setMinMaxIndex(minMax);
-    return blockletIndex;
-  }
-
-  private ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
-      org.carbondata.format.ColumnSchema externalColumnSchema) {
-    ColumnSchema wrapperColumnSchema = new ColumnSchema();
-    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
-    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
-    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
-    wrapperColumnSchema
-        .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type));
-    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
-    List<Encoding> encoders = new ArrayList<Encoding>();
-    for (org.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
-      encoders.add(fromExternalToWrapperEncoding(encoder));
-    }
-    wrapperColumnSchema.setEncodingList(encoders);
-    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
-    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
-    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
-    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
-    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
-    wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function());
-    return wrapperColumnSchema;
-  }
-
-  /**
-   * Below method is to convert the blocklet info of the thrift to wrapper
-   * blocklet info
-   *
-   * @param blockletInfoThrift blocklet info of the thrift
-   * @return blocklet info wrapper
-   */
-  private BlockletInfo getBlockletInfo(org.carbondata.format.BlockletInfo blockletInfoThrift) {
-    BlockletInfo blockletInfo = new BlockletInfo();
-    List<DataChunk> dimensionColumnChunk = new ArrayList<DataChunk>();
-    List<DataChunk> measureChunk = new ArrayList<DataChunk>();
-    Iterator<org.carbondata.format.DataChunk> column_data_chunksIterator =
-        blockletInfoThrift.getColumn_data_chunksIterator();
-    if (null != column_data_chunksIterator) {
-      while (column_data_chunksIterator.hasNext()) {
-        org.carbondata.format.DataChunk next = column_data_chunksIterator.next();
-        if (next.isRowMajor()) {
-          dimensionColumnChunk.add(getDataChunk(next, false));
-        } else if (next.getEncoders().contains(org.carbondata.format.Encoding.DELTA)) {
-          measureChunk.add(getDataChunk(next, true));
-        } else {
-          dimensionColumnChunk.add(getDataChunk(next, false));
-        }
-      }
-    }
-    blockletInfo.setDimensionColumnChunk(dimensionColumnChunk);
-    blockletInfo.setMeasureColumnChunk(measureChunk);
-    blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
-    return blockletInfo;
-  }
-
-  /**
-   * Below method is convert the thrift encoding to wrapper encoding
-   *
-   * @param encoderThrift thrift encoding
-   * @return wrapper encoding
-   */
-  private Encoding fromExternalToWrapperEncoding(org.carbondata.format.Encoding encoderThrift) {
-    switch (encoderThrift) {
-      case DICTIONARY:
-        return Encoding.DICTIONARY;
-      case DELTA:
-        return Encoding.DELTA;
-      case RLE:
-        return Encoding.RLE;
-      case INVERTED_INDEX:
-        return Encoding.INVERTED_INDEX;
-      case BIT_PACKED:
-        return Encoding.BIT_PACKED;
-      case DIRECT_DICTIONARY:
-        return Encoding.DIRECT_DICTIONARY;
-      default:
-        return Encoding.DICTIONARY;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift compression to wrapper
-   * compression codec
-   *
-   * @param compressionCodecThrift
-   * @return wrapper compression codec
-   */
-  private CompressionCodec getCompressionCodec(
-      org.carbondata.format.CompressionCodec compressionCodecThrift) {
-    switch (compressionCodecThrift) {
-      case SNAPPY:
-        return CompressionCodec.SNAPPY;
-      default:
-        return CompressionCodec.SNAPPY;
-    }
-  }
-
-  /**
-   * Below method will be used to convert thrift segment object to wrapper
-   * segment object
-   *
-   * @param segmentInfo thrift segment info object
-   * @return wrapper segment info object
-   */
-  private SegmentInfo getSegmentInfo(org.carbondata.format.SegmentInfo segmentInfo) {
-    SegmentInfo info = new SegmentInfo();
-    int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()];
-    for (int i = 0; i < cardinality.length; i++) {
-      cardinality[i] = segmentInfo.getColumn_cardinalities().get(i);
-    }
-    info.setColumnCardinality(cardinality);
-    info.setNumberOfColumns(segmentInfo.getNum_cols());
-    return info;
-  }
-
-  /**
-   * Below method will be used to convert the blocklet index of thrift to
-   * wrapper
-   *
-   * @param blockletIndexThrift
-   * @return blocklet index wrapper
-   */
-  private BlockletIndex getBlockletIndex(org.carbondata.format.BlockletIndex blockletIndexThrift) {
-    org.carbondata.format.BlockletBTreeIndex btreeIndex = blockletIndexThrift.getB_tree_index();
-    org.carbondata.format.BlockletMinMaxIndex minMaxIndex = blockletIndexThrift.getMin_max_index();
-    return new BlockletIndex(
-        new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()),
-        new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values()));
-  }
-
-  /**
-   * Below method will be used to convert the thrift compression meta to
-   * wrapper chunk compression meta
-   *
-   * @param chunkCompressionMetaThrift
-   * @return chunkCompressionMetaWrapper
-   */
-  private ChunkCompressorMeta getChunkCompressionMeta(
-      org.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) {
-    ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta();
-    compressorMeta
-        .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec()));
-    compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size());
-    compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size());
-    return compressorMeta;
-  }
-
-  /**
-   * Below method will be used to convert the thrift data type to wrapper data
-   * type
-   *
-   * @param dataTypeThrift
-   * @return dataType wrapper
-   */
-  private DataType thriftDataTyopeToWrapperDataType(org.carbondata.format.DataType dataTypeThrift) {
-    switch (dataTypeThrift) {
-      case STRING:
-        return DataType.STRING;
-      case SHORT:
-        return DataType.SHORT;
-      case INT:
-        return DataType.INT;
-      case LONG:
-        return DataType.LONG;
-      case DOUBLE:
-        return DataType.DOUBLE;
-      case DECIMAL:
-        return DataType.DECIMAL;
-      case TIMESTAMP:
-        return DataType.TIMESTAMP;
-      case ARRAY:
-        return DataType.ARRAY;
-      case STRUCT:
-        return DataType.STRUCT;
-      default:
-        return DataType.STRING;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift presence meta to wrapper
-   * presence meta
-   *
-   * @param presentMetadataThrift
-   * @return wrapper presence meta
-   */
-  private PresenceMeta getPresenceMeta(org.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream()));
-    return presenceMeta;
-  }
-
-  /**
-   * Below method will be used to convert the thrift object to wrapper object
-   *
-   * @param sortStateThrift
-   * @return wrapper sort state object
-   */
-  private SortState getSortState(org.carbondata.format.SortState sortStateThrift) {
-    if (sortStateThrift == org.carbondata.format.SortState.SORT_EXPLICIT) {
-      return SortState.SORT_EXPLICT;
-    } else if (sortStateThrift == org.carbondata.format.SortState.SORT_NATIVE) {
-      return SortState.SORT_NATIVE;
-    } else {
-      return SortState.SORT_NONE;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift data chunk to wrapper
-   * data chunk
-   *
-   * @param datachunkThrift
-   * @return wrapper data chunk
-   */
-  private DataChunk getDataChunk(org.carbondata.format.DataChunk datachunkThrift,
-      boolean isPresenceMetaPresent) {
-    DataChunk dataChunk = new DataChunk();
-    dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids());
-    dataChunk.setDataPageLength(datachunkThrift.getData_page_length());
-    dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset());
-    if (isPresenceMetaPresent) {
-      dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence()));
-    }
-    dataChunk.setRlePageLength(datachunkThrift.getRle_page_length());
-    dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset());
-    dataChunk.setRowMajor(datachunkThrift.isRowMajor());
-    dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length());
-    dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset());
-    dataChunk.setSortState(getSortState(datachunkThrift.getSort_state()));
-    dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta()));
-    List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size());
-    for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) {
-      encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i)));
-    }
-    dataChunk.setEncoderList(encodingList);
-    if (encodingList.contains(Encoding.DELTA)) {
-      List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta();
-      List<ValueEncoderMeta> encodeMetaList =
-          new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size());
-      for (int i = 0; i < thriftEncoderMeta.size(); i++) {
-        encodeMetaList.add(deserializeEncoderMeta(thriftEncoderMeta.get(i).array()));
-      }
-      dataChunk.setValueEncoderMeta(encodeMetaList);
-    }
-    return dataChunk;
-  }
-
-  /**
-   * Below method will be used to convert the encode metadata to
-   * ValueEncoderMeta object
-   *
-   * @param encoderMeta
-   * @return ValueEncoderMeta object
-   */
-  private ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) {
-    // TODO : should remove the unnecessary fields.
-    ByteArrayInputStream aos = null;
-    ObjectInputStream objStream = null;
-    ValueEncoderMeta meta = null;
-    try {
-      aos = new ByteArrayInputStream(encoderMeta);
-      objStream = new ObjectInputStream(aos);
-      meta = (ValueEncoderMeta) objStream.readObject();
-    } catch (ClassNotFoundException e) {
-      LOGGER.error(e);
-    } catch (IOException e) {
-      CarbonUtil.closeStreams(objStream);
-    }
-    return meta;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
deleted file mode 100644
index 92316d5..0000000
--- a/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.util;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.math.RoundingMode;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.metadata.datatype.DataType;
-import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.carbondata.core.constants.CarbonCommonConstants;
-
-import org.apache.commons.lang.NumberUtils;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public final class DataTypeUtil {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataTypeUtil.class.getName());
-
-  /**
-   * This method will convert a given value to its specific type
-   *
-   * @param msrValue
-   * @param dataType
-   * @param carbonMeasure
-   * @return
-   */
-  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      CarbonMeasure carbonMeasure) {
-    switch (dataType) {
-      case DECIMAL:
-        BigDecimal bigDecimal =
-            new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
-        return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
-      case INT:
-        return Double.valueOf(msrValue).longValue();
-      case LONG:
-        return Long.valueOf(msrValue);
-      default:
-        return Double.valueOf(msrValue);
-    }
-  }
-
-  /**
-   * This method will check the digits before dot with the max precision allowed
-   *
-   * @param bigDecimal
-   * @param allowedPrecision precision configured by the user
-   * @return
-   */
-  private static BigDecimal normalizeDecimalValue(BigDecimal bigDecimal, int allowedPrecision) {
-    if (bigDecimal.precision() > allowedPrecision) {
-      return null;
-    }
-    return bigDecimal;
-  }
-
-  /**
-   * This method will return the type of measure based on its data type
-   *
-   * @param dataType
-   * @return
-   */
-  public static char getAggType(DataType dataType) {
-    switch (dataType) {
-      case DECIMAL:
-        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
-      case INT:
-      case LONG:
-        return CarbonCommonConstants.BIG_INT_MEASURE;
-      default:
-        return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
-    }
-  }
-
-  /**
-   * This method will convert a big decimal value to bytes
-   *
-   * @param num
-   * @return
-   */
-  public static byte[] bigDecimalToByte(BigDecimal num) {
-    BigInteger sig = new BigInteger(num.unscaledValue().toString());
-    int scale = num.scale();
-    byte[] bscale = new byte[] { (byte) (scale) };
-    byte[] buff = sig.toByteArray();
-    byte[] completeArr = new byte[buff.length + bscale.length];
-    System.arraycopy(bscale, 0, completeArr, 0, bscale.length);
-    System.arraycopy(buff, 0, completeArr, bscale.length, buff.length);
-    return completeArr;
-  }
-
-  /**
-   * This method will convert a byte value back to big decimal value
-   *
-   * @param raw
-   * @return
-   */
-  public static BigDecimal byteToBigDecimal(byte[] raw) {
-    int scale = (raw[0] & 0xFF);
-    byte[] unscale = new byte[raw.length - 1];
-    System.arraycopy(raw, 1, unscale, 0, unscale.length);
-    BigInteger sig = new BigInteger(unscale);
-    return new BigDecimal(sig, scale);
-  }
-
-  /**
-   * returns the SqlStatement.Type of corresponding string value
-   *
-   * @param dataTypeStr
-   * @return return the SqlStatement.Type
-   */
-  public static DataType getDataType(String dataTypeStr) {
-    DataType dataType = null;
-    switch (dataTypeStr) {
-      case "TIMESTAMP":
-        dataType = DataType.TIMESTAMP;
-        break;
-      case "STRING":
-        dataType = DataType.STRING;
-        break;
-      case "INT":
-        dataType = DataType.INT;
-        break;
-      case "SHORT":
-        dataType = DataType.SHORT;
-        break;
-      case "LONG":
-        dataType = DataType.LONG;
-        break;
-      case "DOUBLE":
-        dataType = DataType.DOUBLE;
-        break;
-      case "DECIMAL":
-        dataType = DataType.DECIMAL;
-        break;
-      case "ARRAY":
-        dataType = DataType.ARRAY;
-        break;
-      case "STRUCT":
-        dataType = DataType.STRUCT;
-        break;
-      case "MAP":
-      default:
-        dataType = DataType.STRING;
-    }
-    return dataType;
-  }
-
-  /**
-   * Below method will be used to basically to know whether the input data is valid string of
-   * giving data type. If there is any non parseable string is present return false.
-   */
-  public static boolean isValidData(String data, DataType actualDataType) {
-    if (null == data) {
-      return false;
-    }
-    try {
-      switch (actualDataType) {
-        case SHORT:
-        case INT:
-        case LONG:
-        case DOUBLE:
-        case DECIMAL:
-          return NumberUtils.isNumber(data);
-        case TIMESTAMP:
-          if (data.isEmpty()) {
-            return false;
-          }
-          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
-          try {
-            parser.parse(data);
-            return true;
-          } catch (ParseException e) {
-            return false;
-          }
-        default:
-          return true;
-      }
-    } catch (NumberFormatException ex) {
-      return false;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the data passed to its actual data
-   * type
-   *
-   * @param data           data
-   * @param actualDataType actual data type
-   * @return actual data after conversion
-   */
-  public static Object getDataBasedOnDataType(String data, DataType actualDataType) {
-
-    if (null == data || CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) {
-      return null;
-    }
-    try {
-      switch (actualDataType) {
-        case INT:
-          if (data.isEmpty()) {
-            return null;
-          }
-          return Integer.parseInt(data);
-        case SHORT:
-          if (data.isEmpty()) {
-            return null;
-          }
-          return Short.parseShort(data);
-        case DOUBLE:
-          if (data.isEmpty()) {
-            return null;
-          }
-          return Double.parseDouble(data);
-        case LONG:
-          if (data.isEmpty()) {
-            return null;
-          }
-          return Long.parseLong(data);
-        case TIMESTAMP:
-          if (data.isEmpty()) {
-            return null;
-          }
-          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
-          Date dateToStr = null;
-          try {
-            dateToStr = parser.parse(data);
-            return dateToStr.getTime() * 1000;
-          } catch (ParseException e) {
-            LOGGER.error("Cannot convert" + data + " to Time/Long type value" + e.getMessage());
-            return null;
-          }
-        case DECIMAL:
-          if (data.isEmpty()) {
-            return null;
-          }
-          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
-          scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
-          org.apache.spark.sql.types.Decimal decConverter =
-              new org.apache.spark.sql.types.Decimal();
-          return decConverter.set(scalaDecVal);
-        default:
-          return UTF8String.fromString(data);
-      }
-    } catch (NumberFormatException ex) {
-      LOGGER.error("Problem while converting data type" + data);
-      return null;
-    }
-
-  }
-
-  public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
-
-    if (null == data) {
-      return null;
-    }
-    try {
-      switch (dataType) {
-        case DOUBLE:
-          return data;
-        case LONG:
-          return data;
-        case DECIMAL:
-          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
-          scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
-          org.apache.spark.sql.types.Decimal decConverter =
-              new org.apache.spark.sql.types.Decimal();
-          return decConverter.set(scalaDecVal);
-        default:
-          return data;
-      }
-    } catch (NumberFormatException ex) {
-      LOGGER.error("Problem while converting data type" + data);
-      return null;
-    }
-
-  }
-
-  /**
-   * Below method will be used to basically to know whether any non parseable
-   * data is present or not. if present then return null so that system can
-   * process to default null member value.
-   *
-   * @param data           data
-   * @param actualDataType actual data type
-   * @return actual data after conversion
-   */
-  public static Object normalizeIntAndLongValues(String data, DataType actualDataType) {
-    if (null == data) {
-      return null;
-    }
-    try {
-      Object parsedValue = null;
-      switch (actualDataType) {
-        case INT:
-          parsedValue = Integer.parseInt(data);
-          break;
-        case LONG:
-          parsedValue = Long.parseLong(data);
-          break;
-        default:
-          return data;
-      }
-      if(null != parsedValue) {
-        return data;
-      }
-      return null;
-    } catch (NumberFormatException ex) {
-      return null;
-    }
-  }
-
-  /**
-   * This method will parse a given string value corresponding to its data type
-   *
-   * @param value     value to parse
-   * @param dimension dimension to get data type and precision and scale in case of decimal
-   *                  data type
-   * @return
-   */
-  public static String normalizeColumnValueForItsDataType(String value, CarbonDimension dimension) {
-    try {
-      Object parsedValue = null;
-      // validation will not be done for timestamp datatype as for timestamp direct dictionary
-      // is generated. No dictionary file is created for timestamp datatype column
-      switch (dimension.getDataType()) {
-        case DECIMAL:
-          return parseStringToBigDecimal(value, dimension);
-        case INT:
-        case LONG:
-          parsedValue = normalizeIntAndLongValues(value, dimension.getDataType());
-          break;
-        case DOUBLE:
-          parsedValue = Double.parseDouble(value);
-          break;
-        default:
-          return value;
-      }
-      if (null != parsedValue) {
-        return value;
-      }
-      return null;
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  /**
-   * This method will parse a value to its datatype if datatype is decimal else will return
-   * the value passed
-   *
-   * @param value     value to be parsed
-   * @param dimension
-   * @return
-   */
-  public static String parseValue(String value, CarbonDimension dimension) {
-    try {
-      switch (dimension.getDataType()) {
-        case DECIMAL:
-          return parseStringToBigDecimal(value, dimension);
-        default:
-          return value;
-      }
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  private static String parseStringToBigDecimal(String value, CarbonDimension dimension) {
-    BigDecimal bigDecimal = new BigDecimal(value)
-        .setScale(dimension.getColumnSchema().getScale(), RoundingMode.HALF_UP);
-    BigDecimal normalizedValue =
-        normalizeDecimalValue(bigDecimal, dimension.getColumnSchema().getPrecision());
-    if (null != normalizedValue) {
-      return normalizedValue.toString();
-    }
-    return null;
-  }
-}