You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:20 UTC

[35/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
new file mode 100644
index 0000000..7acabf2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -0,0 +1,1428 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.ColumnGroupModel;
+import org.apache.carbondata.core.datastorage.store.columnar.ColumnarKeyStoreDataHolder;
+import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.scan.model.QueryDimension;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.pentaho.di.core.exception.KettleException;
+
+
+public final class CarbonUtil {
+
+  public static final String HDFS_PREFIX = "hdfs://";
+  public static final String VIEWFS_PREFIX = "viewfs://";
+  private static final String FS_DEFAULT_FS = "fs.defaultFS";
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * EIGHT
+   */
+  private static final int CONST_EIGHT = 8;
+
+  /**
+   * SEVEN
+   */
+  private static final int CONST_SEVEN = 7;
+
+  /**
+   * HUNDRED
+   */
+  private static final int CONST_HUNDRED = 100;
+
+  private static final Configuration conf = new Configuration(true);
+
+  private CarbonUtil() {
+
+  }
+
+  /**
+   * This method closes the streams
+   *
+   * @param streams - streams to close.
+   */
+  public static void closeStreams(Closeable... streams) {
+    // Added if to avoid NullPointerException in case one stream is being passed as null
+    if (null != streams) {
+      for (Closeable stream : streams) {
+        if (null != stream) {
+          try {
+            stream.close();
+          } catch (IOException e) {
+            LOGGER.error("Error while closing stream" + stream);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * @param baseStorePath
+   * @return
+   */
+  private static int createBaseStoreFolders(String baseStorePath) {
+    FileFactory.FileType fileType = FileFactory.getFileType(baseStorePath);
+    try {
+      if (!FileFactory.isFileExist(baseStorePath, fileType, false)) {
+        if (!FileFactory.mkdirs(baseStorePath, fileType)) {
+          return -1;
+        }
+      }
+    } catch (Exception e) {
+      return -1;
+    }
+    return 1;
+  }
+
+  /**
+   * This method checks whether Restructure Folder exists or not
+   * and if not exist then return the number with which folder need to created.
+   *
+   * @param baseStorePath -
+   *                      baselocation where folder will be created.
+   * @return counter
+   * counter with which folder will be created.
+   */
+  public static int checkAndReturnCurrentRestructFolderNumber(String baseStorePath,
+      final String filterType, final boolean isDirectory) {
+    if (null == baseStorePath || 0 == baseStorePath.length()) {
+      return -1;
+    }
+    // change the slashes to /
+    baseStorePath = baseStorePath.replace("\\", "/");
+
+    // check if string wnds with / then remove that.
+    if (baseStorePath.charAt(baseStorePath.length() - 1) == '/') {
+      baseStorePath = baseStorePath.substring(0, baseStorePath.lastIndexOf("/"));
+    }
+    int retValue = createBaseStoreFolders(baseStorePath);
+    if (-1 == retValue) {
+      return retValue;
+    }
+
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(baseStorePath, FileFactory.getFileType(baseStorePath));
+
+    // List of directories
+    CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile pathname) {
+        if (isDirectory && pathname.isDirectory()) {
+          if (pathname.getAbsolutePath().indexOf(filterType) > -1) {
+            return true;
+          }
+        } else {
+          if (pathname.getAbsolutePath().indexOf(filterType) > -1) {
+            return true;
+          }
+        }
+
+        return false;
+      }
+    });
+
+    int counter = -1;
+
+    // if no folder exists then return -1
+    if (listFiles.length == 0) {
+      return counter;
+    }
+
+    counter = findCounterValue(filterType, listFiles, counter);
+    return counter;
+  }
+
+  public static int checkAndReturnCurrentLoadFolderNumber(String baseStorePath) {
+    return checkAndReturnCurrentRestructFolderNumber(baseStorePath, "Load_", true);
+  }
+
+  /**
+   * @param filterType
+   * @param listFiles
+   * @param counter
+   * @return
+   */
+  private static int findCounterValue(final String filterType, CarbonFile[] listFiles,
+      int counter) {
+    if ("Load_".equals(filterType)) {
+      for (CarbonFile files : listFiles) {
+        String folderName = getFolderName(files);
+        if (folderName.indexOf('.') > -1) {
+          folderName = folderName.substring(0, folderName.indexOf('.'));
+        }
+        String[] split = folderName.split("_");
+
+        if (split.length > 1 && counter < Integer.parseInt(split[1])) {
+          counter = Integer.parseInt(split[1]);
+        }
+      }
+    } else {
+      // Iterate list of Directories and find the counter value
+      for (CarbonFile eachFile : listFiles) {
+        String folderName = getFolderName(eachFile);
+        String[] split = folderName.split("_");
+        if (counter < Integer.parseInt(split[1])) {
+          counter = Integer.parseInt(split[1]);
+        }
+      }
+    }
+    return counter;
+  }
+
+  /**
+   * @param eachFile
+   * @return
+   */
+  private static String getFolderName(CarbonFile eachFile) {
+    String str = eachFile.getAbsolutePath();
+    str = str.replace("\\", "/");
+    int firstFolderIndex = str.lastIndexOf("/");
+    String folderName = str.substring(firstFolderIndex);
+    return folderName;
+  }
+
+  /**
+   * This method will be used to update the dimension cardinality
+   *
+   * @param dimCardinality
+   * @return new increment cardinality
+   */
+  public static int[] getIncrementedCardinality(int[] dimCardinality) {
+    // get the cardinality incr factor
+    final int incrValue = CarbonCommonConstants.CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL;
+
+    int perIncr = 0;
+    int remainder = 0;
+    int[] newDimsC = new int[dimCardinality.length];
+    for (int i = 0; i < dimCardinality.length; i++) {
+      // get the incr
+      perIncr = (dimCardinality[i] * incrValue) / CONST_HUNDRED;
+
+      // if per incr is more than one the add to cardinality
+      if (perIncr > 0) {
+        newDimsC[i] = dimCardinality[i] + perIncr;
+      } else {
+        // else add one
+        newDimsC[i] = dimCardinality[i] + 1;
+      }
+      // check whether its in boundary condition
+      remainder = newDimsC[i] % CONST_EIGHT;
+      if (remainder == CONST_SEVEN) {
+        // then incr cardinality by 1
+        newDimsC[i] = dimCardinality[i] + 1;
+      }
+    }
+    // get the log bits of cardinality
+    for (int i = 0; i < newDimsC.length; i++) {
+      newDimsC[i] = Long.toBinaryString(newDimsC[i]).length();
+    }
+    return newDimsC;
+  }
+
+  public static int getIncrementedCardinality(int dimCardinality) {
+    // get the cardinality incr factor
+    final int incrValue = CarbonCommonConstants.CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL;
+
+    int perIncr = 0;
+    int remainder = 0;
+    int newDimsC = 0;
+
+    // get the incr
+    perIncr = (dimCardinality * incrValue) / CONST_HUNDRED;
+
+    // if per incr is more than one the add to cardinality
+    if (perIncr > 0) {
+      newDimsC = dimCardinality + perIncr;
+    } else {
+      // else add one
+      newDimsC = dimCardinality + 1;
+    }
+    // check whether its in boundary condition
+    remainder = newDimsC % CONST_EIGHT;
+    if (remainder == CONST_SEVEN) {
+      // then incr cardinality by 1
+      newDimsC = dimCardinality + 1;
+    }
+    newDimsC = Long.toBinaryString(newDimsC).length();
+    // get the log bits of cardinality
+
+    return newDimsC;
+  }
+
+  /**
+   * return ColumnGroupModel. check ColumnGroupModel for detail
+   *
+   * @param columnGroups : column groups
+   * @return ColumnGroupModel  model
+   */
+  public static ColumnGroupModel getColGroupModel(int[][] columnGroups) {
+    int[] columnSplit = new int[columnGroups.length];
+    int noOfColumnStore = columnSplit.length;
+    boolean[] columnarStore = new boolean[noOfColumnStore];
+
+    for (int i = 0; i < columnGroups.length; i++) {
+      columnSplit[i] = columnGroups[i].length;
+      columnarStore[i] = columnGroups[i].length > 1 ? false : true;
+    }
+    ColumnGroupModel colGroupModel = new ColumnGroupModel();
+    colGroupModel.setNoOfColumnStore(noOfColumnStore);
+    colGroupModel.setColumnSplit(columnSplit);
+    colGroupModel.setColumnarStore(columnarStore);
+    colGroupModel.setColumnGroup(columnGroups);
+    return colGroupModel;
+  }
+
+  /**
+   * This method will be used to update the dimension cardinality
+   *
+   * @param dimCardinality
+   * @return new increment cardinality
+   */
+  public static int[] getIncrementedCardinalityFullyFilled(int[] dimCardinality) {
+    int[] newDimsC = new int[dimCardinality.length];
+    // get the log bits of cardinality
+    for (int i = 0; i < dimCardinality.length; i++) {
+      if (dimCardinality[i] == 0) {
+        //Array or struct type may have higher value
+        newDimsC[i] = 64;
+      } else {
+        int bitsLength = Long.toBinaryString(dimCardinality[i]).length();
+        int div = bitsLength / 8;
+        int mod = bitsLength % 8;
+        if (mod > 0) {
+          newDimsC[i] = 8 * (div + 1);
+        } else {
+          newDimsC[i] = bitsLength;
+        }
+      }
+    }
+    return newDimsC;
+  }
+
+  private static int getBitLengthFullyFilled(int dimlens) {
+    int bitsLength = Long.toBinaryString(dimlens).length();
+    int div = bitsLength / 8;
+    int mod = bitsLength % 8;
+    if (mod > 0) {
+      return 8 * (div + 1);
+    } else {
+      return bitsLength;
+    }
+  }
+
+  /**
+   * This method will be used to delete the folder and files
+   *
+   * @param path file path array
+   * @throws Exception exception
+   */
+  public static void deleteFoldersAndFiles(final File... path) throws CarbonUtilException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+
+        @Override public Void run() throws Exception {
+          for (int i = 0; i < path.length; i++) {
+            deleteRecursive(path[i]);
+          }
+          return null;
+        }
+      });
+    } catch (IOException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    } catch (InterruptedException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    }
+
+  }
+
+  /**
+   * Recursively delete the files
+   *
+   * @param f File to be deleted
+   * @throws CarbonUtilException
+   */
+  private static void deleteRecursive(File f) throws CarbonUtilException {
+    if (f.isDirectory()) {
+      if (f.listFiles() != null) {
+        for (File c : f.listFiles()) {
+          deleteRecursive(c);
+        }
+      }
+    }
+    if (f.exists() && !f.delete()) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    }
+  }
+
+  public static void deleteFoldersAndFiles(final CarbonFile... file) throws CarbonUtilException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+
+        @Override public Void run() throws Exception {
+          for (int i = 0; i < file.length; i++) {
+            deleteRecursive(file[i]);
+          }
+          return null;
+        }
+      });
+    } catch (IOException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    } catch (InterruptedException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    }
+  }
+
+  public static String getBadLogPath(String storeLocation) {
+    String badLogStoreLocation =
+            CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
+  public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
+      throws CarbonUtilException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+
+        @Override public Void run() throws Exception {
+          for (int i = 0; i < file.length; i++) {
+            deleteRecursiveSilent(file[i]);
+          }
+          return null;
+        }
+      });
+    } catch (IOException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    } catch (InterruptedException e) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    }
+  }
+
+  /**
+   * This function will rename the table to be deleted
+   *
+   * @param partitionCount
+   * @param storePath
+   * @param databaseName
+   * @param tableName
+   */
+  public static void renameTableForDeletion(int partitionCount, String storePath,
+      String databaseName, String tableName) {
+    String tableNameWithPartition = "";
+    String databaseNameWithPartition = "";
+    String fullPath = "";
+    String newFilePath = "";
+    String newFileName = "";
+    Callable<Void> c = null;
+    long time = System.currentTimeMillis();
+    FileFactory.FileType fileType = null;
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < partitionCount; i++) {
+      databaseNameWithPartition = databaseName + '_' + i;
+      tableNameWithPartition = tableName + '_' + i;
+      newFileName = tableNameWithPartition + '_' + time;
+      fullPath = storePath + File.separator + databaseNameWithPartition + File.separator
+          + tableNameWithPartition;
+      newFilePath =
+          storePath + File.separator + databaseNameWithPartition + File.separator + newFileName;
+      fileType = FileFactory.getFileType(fullPath);
+      try {
+        if (FileFactory.isFileExist(fullPath, fileType)) {
+          CarbonFile file = FileFactory.getCarbonFile(fullPath, fileType);
+          boolean isRenameSuccessfull = file.renameTo(newFilePath);
+          if (!isRenameSuccessfull) {
+            LOGGER.error("Problem renaming the table :: " + fullPath);
+            c = new DeleteFolderAndFiles(file);
+            executorService.submit(c);
+          } else {
+            c = new DeleteFolderAndFiles(FileFactory.getCarbonFile(newFilePath, fileType));
+            executorService.submit(c);
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Problem renaming the table :: " + fullPath);
+      }
+    }
+    executorService.shutdown();
+  }
+
+  /**
+   * Recursively delete the files
+   *
+   * @param f File to be deleted
+   * @throws CarbonUtilException
+   */
+  private static void deleteRecursive(CarbonFile f) throws CarbonUtilException {
+    if (f.isDirectory()) {
+      if (f.listFiles() != null) {
+        for (CarbonFile c : f.listFiles()) {
+          deleteRecursive(c);
+        }
+      }
+    }
+    if (f.exists() && !f.delete()) {
+      throw new CarbonUtilException("Error while deleting the folders and files");
+    }
+  }
+
+  private static void deleteRecursiveSilent(CarbonFile f) throws CarbonUtilException {
+    if (f.isDirectory()) {
+      if (f.listFiles() != null) {
+        for (CarbonFile c : f.listFiles()) {
+          deleteRecursiveSilent(c);
+        }
+      }
+    }
+    if (f.exists() && !f.delete()) {
+      return;
+    }
+  }
+
+  public static void deleteFiles(File[] intermediateFiles) throws CarbonUtilException {
+    for (int i = 0; i < intermediateFiles.length; i++) {
+      if (!intermediateFiles[i].delete()) {
+        throw new CarbonUtilException("Problem while deleting intermediate file");
+      }
+    }
+  }
+
+  public static byte[] getKeyArray(ColumnarKeyStoreDataHolder[] columnarKeyStoreDataHolder,
+      int totalKeySize, int eachKeySize) {
+    byte[] completeKeyArray = new byte[totalKeySize];
+    byte[] keyBlockData = null;
+    int destinationPosition = 0;
+    int[] columnIndex = null;
+    int blockKeySize = 0;
+    for (int i = 0; i < columnarKeyStoreDataHolder.length; i++) {
+      keyBlockData = columnarKeyStoreDataHolder[i].getKeyBlockData();
+      blockKeySize = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getEachRowSize();
+      if (columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().isSorted()) {
+        for (int j = 0; j < keyBlockData.length; j += blockKeySize) {
+          System.arraycopy(keyBlockData, j, completeKeyArray, destinationPosition, blockKeySize);
+          destinationPosition += eachKeySize;
+        }
+      } else {
+        columnIndex = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getColumnIndex();
+
+        for (int j = 0; j < columnIndex.length; j++) {
+          System.arraycopy(keyBlockData, columnIndex[j] * blockKeySize, completeKeyArray,
+              eachKeySize * columnIndex[j] + destinationPosition, blockKeySize);
+        }
+      }
+      destinationPosition = blockKeySize;
+    }
+    return completeKeyArray;
+  }
+
+  public static byte[] getKeyArray(ColumnarKeyStoreDataHolder[] columnarKeyStoreDataHolder,
+      int totalKeySize, int eachKeySize, short[] columnIndex) {
+    byte[] completeKeyArray = new byte[totalKeySize];
+    byte[] keyBlockData = null;
+    int destinationPosition = 0;
+    int blockKeySize = 0;
+    for (int i = 0; i < columnarKeyStoreDataHolder.length; i++) {
+      keyBlockData = columnarKeyStoreDataHolder[i].getKeyBlockData();
+      blockKeySize = columnarKeyStoreDataHolder[i].getColumnarKeyStoreMetadata().getEachRowSize();
+
+      for (int j = 0; j < columnIndex.length; j++) {
+        System.arraycopy(keyBlockData, columnIndex[j] * blockKeySize, completeKeyArray,
+            destinationPosition, blockKeySize);
+        destinationPosition += eachKeySize;
+      }
+      destinationPosition = blockKeySize;
+    }
+    return completeKeyArray;
+  }
+
+  public static int getFirstIndexUsingBinarySearch(FixedLengthDimensionDataChunk dimColumnDataChunk,
+      int low, int high, byte[] compareValue, boolean matchUpLimit) {
+    int cmpResult = 0;
+    while (high >= low) {
+      int mid = (low + high) / 2;
+      cmpResult = ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(dimColumnDataChunk.getCompleteDataChunk(), mid * compareValue.length,
+              compareValue.length, compareValue, 0, compareValue.length);
+      if (cmpResult < 0) {
+        low = mid + 1;
+      } else if (cmpResult > 0) {
+        high = mid - 1;
+      } else {
+        int currentIndex = mid;
+        if(!matchUpLimit) {
+          while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+                  (currentIndex - 1) * compareValue.length, compareValue.length, compareValue, 0,
+                  compareValue.length) == 0) {
+            --currentIndex;
+          }
+        } else {
+          while (currentIndex + 1 <= high && ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+                  (currentIndex + 1) * compareValue.length, compareValue.length, compareValue, 0,
+                  compareValue.length) == 0) {
+            currentIndex++;
+          }
+        }
+        return currentIndex;
+      }
+    }
+    return -(low + 1);
+  }
+
+  /**
+   * Method will identify the value which is lesser than the pivot element
+   * on which range filter is been applied.
+   *
+   * @param currentIndex
+   * @param dimColumnDataChunk
+   * @param compareValue
+   * @return index value
+   */
+  public static int nextLesserValueToTarget(int currentIndex,
+      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue) {
+    while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+            (currentIndex - 1) * compareValue.length, compareValue.length, compareValue, 0,
+            compareValue.length) >= 0) {
+      --currentIndex;
+    }
+
+    return --currentIndex;
+  }
+
+  /**
+   * Method will identify the value which is greater than the pivot element
+   * on which range filter is been applied.
+   *
+   * @param currentIndex
+   * @param dimColumnDataChunk
+   * @param compareValue
+   * @param numerOfRows
+   * @return index value
+   */
+  public static int nextGreaterValueToTarget(int currentIndex,
+      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue, int numerOfRows) {
+    while (currentIndex + 1 < numerOfRows && ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+            (currentIndex + 1) * compareValue.length, compareValue.length, compareValue, 0,
+            compareValue.length) <= 0) {
+      ++currentIndex;
+    }
+
+    return ++currentIndex;
+  }
+
+  public static int[] getUnCompressColumnIndex(int totalLength, byte[] columnIndexData,
+      NumberCompressor numberCompressor) {
+    ByteBuffer buffer = ByteBuffer.wrap(columnIndexData);
+    buffer.rewind();
+    int indexDataLength = buffer.getInt();
+    byte[] indexData = new byte[indexDataLength];
+    byte[] indexMap =
+        new byte[totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE];
+    buffer.get(indexData);
+    buffer.get(indexMap);
+    return UnBlockIndexer.uncompressIndex(numberCompressor.unCompress(indexData),
+        numberCompressor.unCompress(indexMap));
+  }
+
+  /**
+   * Convert int array to Integer list
+   *
+   * @param array
+   * @return List<Integer>
+   */
+  public static List<Integer> convertToIntegerList(int[] array) {
+    List<Integer> integers = new ArrayList<Integer>();
+    for (int i = 0; i < array.length; i++) {
+      integers.add(array[i]);
+    }
+    return integers;
+  }
+
+  /**
+   * Read level metadata file and return cardinality
+   *
+   * @param levelPath
+   * @return
+   * @throws CarbonUtilException
+   */
+  public static int[] getCardinalityFromLevelMetadataFile(String levelPath)
+      throws CarbonUtilException {
+    DataInputStream dataInputStream = null;
+    int[] cardinality = null;
+
+    try {
+      if (FileFactory.isFileExist(levelPath, FileFactory.getFileType(levelPath))) {
+        dataInputStream =
+            FileFactory.getDataInputStream(levelPath, FileFactory.getFileType(levelPath));
+
+        cardinality = new int[dataInputStream.readInt()];
+
+        for (int i = 0; i < cardinality.length; i++) {
+          cardinality[i] = dataInputStream.readInt();
+        }
+      }
+    } catch (FileNotFoundException e) {
+      throw new CarbonUtilException("Problem while getting the file", e);
+    } catch (IOException e) {
+      throw new CarbonUtilException("Problem while reading the file", e);
+    } finally {
+      closeStreams(dataInputStream);
+    }
+
+    return cardinality;
+  }
+
+  public static void writeLevelCardinalityFile(String loadFolderLoc, String tableName,
+      int[] dimCardinality) throws KettleException {
+    String levelCardinalityFilePath = loadFolderLoc + File.separator +
+        CarbonCommonConstants.LEVEL_METADATA_FILE + tableName
+        + CarbonCommonConstants.CARBON_METADATA_EXTENSION;
+    FileOutputStream fileOutputStream = null;
+    FileChannel channel = null;
+    try {
+      int dimCardinalityArrLength = dimCardinality.length;
+
+      // first four bytes for writing the length of array, remaining for array data
+      ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE
+          + dimCardinalityArrLength * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+
+      fileOutputStream = new FileOutputStream(levelCardinalityFilePath);
+      channel = fileOutputStream.getChannel();
+      buffer.putInt(dimCardinalityArrLength);
+
+      for (int i = 0; i < dimCardinalityArrLength; i++) {
+        buffer.putInt(dimCardinality[i]);
+      }
+
+      buffer.flip();
+      channel.write(buffer);
+      buffer.clear();
+
+      LOGGER.info("Level cardinality file written to : " + levelCardinalityFilePath);
+    } catch (IOException e) {
+      LOGGER.error("Error while writing level cardinality file : " + levelCardinalityFilePath + e
+          .getMessage());
+      throw new KettleException("Not able to write level cardinality file", e);
+    } finally {
+      closeStreams(channel, fileOutputStream);
+    }
+  }
+
+  /**
+   * From beeline if a delimeter is passed as \001, in code we get it as
+   * escaped string as \\001. So this method will unescape the slash again and
+   * convert it back t0 \001
+   *
+   * @param parseStr
+   * @return
+   */
+  public static String unescapeChar(String parseStr) {
+    switch (parseStr) {
+      case "\\001":
+        return "\001";
+      case "\\t":
+        return "\t";
+      case "\\r":
+        return "\r";
+      case "\\b":
+        return "\b";
+      case "\\f":
+        return "\f";
+      case "\\n":
+        return "\n";
+      default:
+        return parseStr;
+    }
+  }
+
+  public static String escapeComplexDelimiterChar(String parseStr) {
+    switch (parseStr) {
+      case "$":
+        return "\\$";
+      case ":":
+        return "\\:";
+      default:
+        return parseStr;
+    }
+  }
+
+  /**
+   * Append HDFS Base Url for show create & load data sql
+   *
+   * @param filePath
+   */
+  public static String checkAndAppendHDFSUrl(String filePath) {
+    String currentPath = filePath;
+    if (null != filePath && filePath.length() != 0 &&
+        FileFactory.getFileType(filePath) != FileFactory.FileType.HDFS &&
+        FileFactory.getFileType(filePath) != FileFactory.FileType.VIEWFS) {
+      String baseDFSUrl = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL);
+      if (null != baseDFSUrl) {
+        String dfsUrl = conf.get(FS_DEFAULT_FS);
+        if (null != dfsUrl && (dfsUrl.startsWith(HDFS_PREFIX) || dfsUrl
+            .startsWith(VIEWFS_PREFIX))) {
+          baseDFSUrl = dfsUrl + baseDFSUrl;
+        }
+        if (baseDFSUrl.endsWith("/")) {
+          baseDFSUrl = baseDFSUrl.substring(0, baseDFSUrl.length() - 1);
+        }
+        if (!filePath.startsWith("/")) {
+          filePath = "/" + filePath;
+        }
+        currentPath = baseDFSUrl + filePath;
+      }
+    }
+    return currentPath;
+  }
+
+  /**
+   * @param location
+   * @param factTableName
+   * @return
+   */
+  public static int getRestructureNumber(String location, String factTableName) {
+    String restructName =
+        location.substring(location.indexOf(CarbonCommonConstants.RESTRUCTRE_FOLDER));
+    int factTableIndex = restructName.indexOf(factTableName) - 1;
+    String restructNumber =
+        restructName.substring(CarbonCommonConstants.RESTRUCTRE_FOLDER.length(), factTableIndex);
+    return Integer.parseInt(restructNumber);
+  }
+
+  /**
+   * Below method will be used to get the aggregator type
+   * CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE will return when value is double measure
+   * CarbonCommonConstants.BYTE_VALUE_MEASURE will be returned when value is byte array
+   *
+   * @param agg
+   * @return aggregator type
+   */
+  public static char getType(String agg) {
+    if (CarbonCommonConstants.SUM.equals(agg) || CarbonCommonConstants.COUNT.equals(agg)) {
+      return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
+    } else {
+      return CarbonCommonConstants.BYTE_VALUE_MEASURE;
+    }
+  }
+
+  public static String getCarbonStorePath(String databaseName, String tableName) {
+    CarbonProperties prop = CarbonProperties.getInstance();
+    if (null == prop) {
+      return null;
+    }
+    String basePath = prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
+        CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    return basePath;
+  }
+
+  /**
+   * This method will check the existence of a file at a given path
+   */
+  public static boolean isFileExists(String fileName) {
+    try {
+      FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+      if (FileFactory.isFileExist(fileName, fileType)) {
+        return true;
+      }
+    } catch (IOException e) {
+      LOGGER.error("@@@@@@  File not found at a given location @@@@@@ : " + fileName);
+    }
+    return false;
+  }
+
+  /**
+   * This method will check and create the given path
+   */
+  public static boolean checkAndCreateFolder(String path) {
+    boolean created = false;
+    try {
+      FileFactory.FileType fileType = FileFactory.getFileType(path);
+      if (FileFactory.isFileExist(path, fileType)) {
+        created = true;
+      } else {
+        created = FileFactory.mkdirs(path, fileType);
+      }
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+    return created;
+  }
+
+  /**
+   * This method will return the size of a given file
+   */
+  public static long getFileSize(String filePath) {
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
+    return carbonFile.getSize();
+  }
+
+  /**
+   * This method will be used to get bit length of the dimensions based on the
+   * dimension partitioner. If partitioner is value is 1 the column
+   * cardinality will be incremented in such a way it will fit in byte level.
+   * for example if number of bits required to store one column value is 3
+   * bits the 8 bit will be assigned to each value of that column.In this way
+   * we may waste some bits(maximum 7 bits) If partitioner value is more than
+   * 1 then few column are stored together. so cardinality of that group will
+   * be incremented to fit in byte level For example: if cardinality for 3
+   * columns stored together is [1,1,1] then number of bits required will be
+   * [1,1,1] then last value will be incremented and it will become[1,1,6]
+   *
+   * @param dimCardinality cardinality of each column
+   * @param dimPartitioner Partitioner is how column is stored if value is 1 then column
+   *                       wise if value is more than 1 then it is in group with other
+   *                       column
+   * @return number of bits for each column
+   * @TODO for row group only last value is incremented problem in this cases
+   * in if last column in that group is selected most of the time in
+   * filter query Comparison will be more if it incremented uniformly
+   * then comparison will be distributed
+   */
+  public static int[] getDimensionBitLength(int[] dimCardinality, int[] dimPartitioner) {
+    int[] bitLength = new int[dimCardinality.length];
+    int dimCounter = 0;
+    for (int i = 0; i < dimPartitioner.length; i++) {
+      if (dimPartitioner[i] == 1) {
+        // for columnar store
+        // fully filled bits means complete byte or number of bits
+        // assigned will be in
+        // multiplication of 8
+        bitLength[dimCounter] = getBitLengthFullyFilled(dimCardinality[dimCounter]);
+        dimCounter++;
+      } else {
+        // for row store
+        int totalSize = 0;
+        for (int j = 0; j < dimPartitioner[i]; j++) {
+          bitLength[dimCounter] = getIncrementedCardinality(dimCardinality[dimCounter]);
+          totalSize += bitLength[dimCounter];
+          dimCounter++;
+        }
+        // below code is to increment in such a way that row group will
+        // be stored
+        // as byte level
+        int mod = totalSize % 8;
+        if (mod > 0) {
+          bitLength[dimCounter - 1] = bitLength[dimCounter - 1] + (8 - mod);
+        }
+      }
+    }
+    return bitLength;
+  }
+
+  /**
+   * Below method will be used to get the value compression model of the
+   * measure data chunk
+   *
+   * @param measureDataChunkList
+   * @return value compression model
+   */
+  public static ValueCompressionModel getValueCompressionModel(
+      List<DataChunk> measureDataChunkList) {
+    Object[] maxValue = new Object[measureDataChunkList.size()];
+    Object[] minValue = new Object[measureDataChunkList.size()];
+    Object[] uniqueValue = new Object[measureDataChunkList.size()];
+    int[] decimal = new int[measureDataChunkList.size()];
+    char[] type = new char[measureDataChunkList.size()];
+    byte[] dataTypeSelected = new byte[measureDataChunkList.size()];
+
+    /**
+     * to fill the meta data required for value compression model
+     */
+    for (int i = 0; i < dataTypeSelected.length; i++) {
+      int indexOf = measureDataChunkList.get(i).getEncodingList().indexOf(Encoding.DELTA);
+      if (indexOf > -1) {
+        ValueEncoderMeta valueEncoderMeta =
+            measureDataChunkList.get(i).getValueEncoderMeta().get(indexOf);
+        maxValue[i] = valueEncoderMeta.getMaxValue();
+        minValue[i] = valueEncoderMeta.getMinValue();
+        uniqueValue[i] = valueEncoderMeta.getUniqueValue();
+        decimal[i] = valueEncoderMeta.getDecimal();
+        type[i] = valueEncoderMeta.getType();
+        dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected();
+      }
+    }
+    MeasureMetaDataModel measureMetadataModel =
+        new MeasureMetaDataModel(minValue, maxValue, decimal, dataTypeSelected.length, uniqueValue,
+            type, dataTypeSelected);
+    return ValueCompressionUtil.getValueCompressionModel(measureMetadataModel);
+  }
+
+  /**
+   * Below method will be used to check whether particular encoding is present
+   * in the dimension or not
+   *
+   * @param encoding  encoding to search
+   * @return if encoding is present in dimension
+   */
+  public static boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+    return encodings.contains(encoding);
+  }
+
+  /**
+   * below method is to check whether data type is present in the data type array
+   *
+   * @param dataType  data type to be searched
+   * @param dataTypes all data types
+   * @return if data type is present
+   */
+  public static boolean hasDataType(DataType dataType, DataType[] dataTypes) {
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (dataType.equals(dataTypes[i])) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * below method is to check whether it is complex data type
+   *
+   * @param dataType  data type to be searched
+   * @return if data type is present
+   */
+  public static boolean hasComplexDataType(DataType dataType) {
+    switch (dataType) {
+      case ARRAY :
+      case STRUCT:
+      case MAP:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  public static boolean[] getDictionaryEncodingArray(QueryDimension[] queryDimensions) {
+    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
+    for (int i = 0; i < queryDimensions.length; i++) {
+      dictionaryEncodingArray[i] =
+          queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY);
+    }
+    return dictionaryEncodingArray;
+  }
+
+  public static boolean[] getDirectDictionaryEncodingArray(QueryDimension[] queryDimensions) {
+    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
+    for (int i = 0; i < queryDimensions.length; i++) {
+      dictionaryEncodingArray[i] =
+          queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY);
+    }
+    return dictionaryEncodingArray;
+  }
+
+  public static boolean[] getComplexDataTypeArray(QueryDimension[] queryDimensions) {
+    boolean[] dictionaryEncodingArray = new boolean[queryDimensions.length];
+    for (int i = 0; i < queryDimensions.length; i++) {
+      dictionaryEncodingArray[i] =
+          CarbonUtil.hasComplexDataType(queryDimensions[i].getDimension().getDataType());
+    }
+    return dictionaryEncodingArray;
+  }
+
+  /**
+   * Below method will be used to read the data file matadata
+   *
+   * @param filePath file path
+   * @param blockOffset   offset in the file
+   * @return Data file metadata instance
+   * @throws CarbonUtilException
+   */
+  public static DataFileFooter readMetadatFile(String filePath, long blockOffset, long blockLength)
+      throws CarbonUtilException {
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    try {
+      return fileFooterConverter.readDataFileFooter(filePath, blockOffset, blockLength);
+    } catch (IOException e) {
+      throw new CarbonUtilException("Problem while reading the file metadata", e);
+    }
+  }
+
+  /**
+   * Below method will be used to get the surrogate key
+   *
+   * @param data   actual data
+   * @param buffer byte buffer which will be used to convert the data to integer value
+   * @return surrogate key
+   */
+  public static int getSurrogateKey(byte[] data, ByteBuffer buffer) {
+    int lenght = 4 - data.length;
+    for (int i = 0; i < lenght; i++) {
+      buffer.put((byte) 0);
+    }
+    buffer.put(data);
+    buffer.rewind();
+    int surrogate = buffer.getInt();
+    buffer.clear();
+    return surrogate;
+  }
+
+  /**
+   * Thread to delete the tables
+   *
+   */
+  private static final class DeleteFolderAndFiles implements Callable<Void> {
+    private CarbonFile file;
+
+    private DeleteFolderAndFiles(CarbonFile file) {
+      this.file = file;
+    }
+
+    @Override public Void call() throws Exception {
+      deleteFoldersAndFiles(file);
+      return null;
+    }
+
+  }
+
+  /**
+   * class to sort aggregate folder list in descending order
+   */
+  private static class AggTableComparator implements Comparator<String> {
+    public int compare(String aggTable1, String aggTable2) {
+      int index1 = aggTable1.lastIndexOf(CarbonCommonConstants.UNDERSCORE);
+      int index2 = aggTable2.lastIndexOf(CarbonCommonConstants.UNDERSCORE);
+      int n1 = Integer.parseInt(aggTable1.substring(index1 + 1));
+      int n2 = Integer.parseInt(aggTable2.substring(index2 + 1));
+      if (n1 > n2) {
+        return -1;
+      } else if (n1 < n2) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to get the dimension
+   *
+   * @param tableDimensionList table dimension list
+   * @return boolean array specifying true if dimension is dictionary
+   * and false if dimension is not a dictionary column
+   */
+  public static boolean[] identifyDimensionType(List<CarbonDimension> tableDimensionList) {
+    List<Boolean> isDictionaryDimensions = new ArrayList<Boolean>();
+    Set<Integer> processedColumnGroup = new HashSet<Integer>();
+    for (CarbonDimension carbonDimension : tableDimensionList) {
+      List<CarbonDimension> childs = carbonDimension.getListOfChildDimensions();
+      //assuming complex dimensions will always be atlast
+      if(null != childs && childs.size() > 0) {
+        break;
+      }
+      if (carbonDimension.isColumnar() && hasEncoding(carbonDimension.getEncoder(),
+          Encoding.DICTIONARY)) {
+        isDictionaryDimensions.add(true);
+      } else if (!carbonDimension.isColumnar()) {
+        if (processedColumnGroup.add(carbonDimension.columnGroupId())) {
+          isDictionaryDimensions.add(true);
+        }
+      } else {
+        isDictionaryDimensions.add(false);
+      }
+    }
+    boolean[] primitive = ArrayUtils
+        .toPrimitive(isDictionaryDimensions.toArray(new Boolean[isDictionaryDimensions.size()]));
+    return primitive;
+  }
+
+  /**
+   * This method will form one single byte [] for all the high card dims.
+   * First it will add all the indexes of variable length byte[] and then the
+   * actual value
+   *
+   * @param byteBufferArr
+   * @return byte[] key.
+   */
+  public static byte[] packByteBufferIntoSingleByteArray(ByteBuffer[] byteBufferArr) {
+    // for empty array means there is no data to remove dictionary.
+    if (null == byteBufferArr || byteBufferArr.length == 0) {
+      return null;
+    }
+    int noOfCol = byteBufferArr.length;
+    short offsetLen = (short) (noOfCol * 2);
+    int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen;
+    ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
+    // writing the offset of the first element.
+    buffer.putShort(offsetLen);
+
+    // prepare index for byte []
+    for (int index = 0; index < byteBufferArr.length - 1; index++) {
+      ByteBuffer individualCol = byteBufferArr[index];
+      int noOfBytes = individualCol.capacity();
+      buffer.putShort((short) (offsetLen + noOfBytes));
+      offsetLen += noOfBytes;
+      individualCol.rewind();
+    }
+
+    // put actual data.
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      ByteBuffer individualCol = byteBufferArr[index];
+      buffer.put(individualCol.array());
+    }
+
+    buffer.rewind();
+    return buffer.array();
+
+  }
+
+  /**
+   * To calculate the total bytes in byte Buffer[].
+   *
+   * @param byteBufferArr
+   * @return
+   */
+  private static int calculateTotalBytes(ByteBuffer[] byteBufferArr) {
+    int total = 0;
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      total += byteBufferArr[index].capacity();
+    }
+    return total;
+  }
+
+  /**
+   * Find the dimension from metadata by using unique name. As of now we are
+   * taking level name as unique name. But user needs to give one unique name
+   * for each level,that level he needs to mention in query.
+   *
+   * @param dimensions
+   * @param carbonDim
+   * @return
+   */
+  public static CarbonDimension findDimension(List<CarbonDimension> dimensions, String carbonDim) {
+    CarbonDimension findDim = null;
+    for (CarbonDimension dimension : dimensions) {
+      if (dimension.getColName().equalsIgnoreCase(carbonDim)) {
+        findDim = dimension;
+        break;
+      }
+    }
+    return findDim;
+  }
+
+  /**
+   * This method will be used to clear the dictionary cache after its usage is complete
+   * so that if memory threshold is reached it can evicted from LRU cache
+   *
+   * @param dictionary
+   */
+  public static void clearDictionaryCache(Dictionary dictionary) {
+    if (null != dictionary) {
+      dictionary.clear();
+    }
+  }
+
+  /**
+   * convert from wrapper to external data type
+   *
+   * @param dataType
+   * @return
+   */
+  public static org.apache.carbondata.format.DataType fromWrapperToExternalDataType(
+      DataType dataType) {
+
+    if (null == dataType) {
+      return null;
+    }
+    switch (dataType) {
+      case STRING:
+        return org.apache.carbondata.format.DataType.STRING;
+      case INT:
+        return org.apache.carbondata.format.DataType.INT;
+      case LONG:
+        return org.apache.carbondata.format.DataType.LONG;
+      case DOUBLE:
+        return org.apache.carbondata.format.DataType.DOUBLE;
+      case DECIMAL:
+        return org.apache.carbondata.format.DataType.DECIMAL;
+      case TIMESTAMP:
+        return org.apache.carbondata.format.DataType.TIMESTAMP;
+      case ARRAY:
+        return org.apache.carbondata.format.DataType.ARRAY;
+      case STRUCT:
+        return org.apache.carbondata.format.DataType.STRUCT;
+      default:
+        return org.apache.carbondata.format.DataType.STRING;
+    }
+  }
+
+  /**
+   * convert from external to wrapper data type
+   *
+   * @param dataType
+   * @return
+   */
+  public static DataType fromExternalToWrapperDataType(
+      org.apache.carbondata.format.DataType dataType) {
+    if (null == dataType) {
+      return null;
+    }
+    switch (dataType) {
+      case STRING:
+        return DataType.STRING;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DOUBLE:
+        return DataType.DOUBLE;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      case TIMESTAMP:
+        return DataType.TIMESTAMP;
+      case ARRAY:
+        return DataType.ARRAY;
+      case STRUCT:
+        return DataType.STRUCT;
+      default:
+        return DataType.STRING;
+    }
+  }
+  /**
+   * @param dictionaryColumnCardinality
+   * @param wrapperColumnSchemaList
+   * @return It returns formatted cardinality by adding -1 value for NoDictionary columns
+   */
+  public static int[] getFormattedCardinality(int[] dictionaryColumnCardinality,
+      List<ColumnSchema> wrapperColumnSchemaList) {
+    List<Integer> cardinality = new ArrayList<>();
+    int counter = 0;
+    for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
+      if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
+          org.apache.carbondata.core.carbon.metadata.encoder.Encoding.DICTIONARY)) {
+        cardinality.add(dictionaryColumnCardinality[counter]);
+        counter++;
+      } else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
+        continue;
+      } else {
+        cardinality.add(-1);
+      }
+    }
+    return ArrayUtils.toPrimitive(cardinality.toArray(new Integer[cardinality.size()]));
+  }
+
+  public static List<ColumnSchema> getColumnSchemaList(List<CarbonDimension> carbonDimensionsList,
+      List<CarbonMeasure> carbonMeasureList) {
+    List<ColumnSchema> wrapperColumnSchemaList = new ArrayList<ColumnSchema>();
+    fillCollumnSchemaListForComplexDims(carbonDimensionsList, wrapperColumnSchemaList);
+    for (CarbonMeasure carbonMeasure : carbonMeasureList) {
+      wrapperColumnSchemaList.add(carbonMeasure.getColumnSchema());
+    }
+    return wrapperColumnSchemaList;
+  }
+
+  private static void fillCollumnSchemaListForComplexDims(
+      List<CarbonDimension> carbonDimensionsList, List<ColumnSchema> wrapperColumnSchemaList) {
+    for (CarbonDimension carbonDimension : carbonDimensionsList) {
+      wrapperColumnSchemaList.add(carbonDimension.getColumnSchema());
+      List<CarbonDimension> childDims = carbonDimension.getListOfChildDimensions();
+      if (null != childDims && childDims.size() > 0) {
+        fillCollumnSchemaListForComplexDims(childDims, wrapperColumnSchemaList);
+      }
+    }
+  }
+  /**
+   * Below method will be used to get all the block index info from index file
+   *
+   * @param taskId                  task id of the file
+   * @param tableBlockInfoList      list of table block
+   * @param absoluteTableIdentifier absolute table identifier
+   * @return list of block info
+   * @throws CarbonUtilException if any problem while reading
+   */
+  public static List<DataFileFooter> readCarbonIndexFile(String taskId,
+      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
+      throws CarbonUtilException {
+    // need to sort the  block info list based for task in ascending  order so
+    // it will be sinkup with block index read from file
+    Collections.sort(tableBlockInfoList);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    // geting the index file path
+    //TODO need to pass proper partition number when partiton will be supported
+    String carbonIndexFilePath = carbonTablePath
+        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    try {
+      // read the index info and return
+      return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
+    } catch (IOException e) {
+      throw new CarbonUtilException("Problem while reading the file metadata", e);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtilException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtilException.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtilException.java
new file mode 100644
index 0000000..bdeed7f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtilException.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.Locale;
+
+public class CarbonUtilException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonUtilException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonUtilException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
new file mode 100644
index 0000000..3a1da8c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta;
+import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.FileFooter;
+
+/**
+ * Below class will be used to convert the thrift object of data file
+ * meta data to wrapper object
+ */
+public class DataFileFooterConverter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataFileFooterConverter.class.getName());
+
+  /**
+   * Below method will be used to get the index info from index file
+   *
+   * @param filePath           file path of the index file
+   * @param tableBlockInfoList table block index
+   * @return list of index info
+   * @throws IOException problem while reading the index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+      throws IOException {
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+    try {
+      // open the reader
+      indexReader.openThriftReader(filePath);
+      // get the index header
+      org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns =
+          readIndexHeader.getTable_columns();
+      for (int i = 0; i < table_columns.size(); i++) {
+        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+      }
+      // get the segment info
+      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+      BlockletIndex blockletIndex = null;
+      int counter = 0;
+      DataFileFooter dataFileFooter = null;
+      // read the block info from file
+      while (indexReader.hasNext()) {
+        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+        dataFileFooter = new DataFileFooter();
+        dataFileFooter.setBlockletIndex(blockletIndex);
+        dataFileFooter.setColumnInTable(columnSchemaList);
+        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+        dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
+        dataFileFooter.setSegmentInfo(segmentInfo);
+        dataFileFooters.add(dataFileFooter);
+      }
+    } finally {
+      indexReader.closeThriftReader();
+    }
+    return dataFileFooters;
+  }
+
+  /**
+   * Below method will be used to convert thrift file meta to wrapper file meta
+   */
+  public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)
+      throws IOException {
+    DataFileFooter dataFileFooter = new DataFileFooter();
+    FileHolder fileReader = null;
+    try {
+      long completeBlockLength = blockOffset + blockLength;
+      long footerPointer = completeBlockLength - 8;
+      fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath));
+      long actualFooterOffset = fileReader.readLong(filePath, footerPointer);
+      CarbonFooterReader reader = new CarbonFooterReader(filePath, actualFooterOffset);
+      FileFooter footer = reader.readFooter();
+      dataFileFooter.setVersionId(footer.getVersion());
+      dataFileFooter.setNumberOfRows(footer.getNum_rows());
+      dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+      for (int i = 0; i < table_columns.size(); i++) {
+        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+      }
+      dataFileFooter.setColumnInTable(columnSchemaList);
+
+      List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+          footer.getBlocklet_index_list();
+      List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+      for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+        BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+        blockletIndexList.add(blockletIndex);
+      }
+
+      List<org.apache.carbondata.format.BlockletInfo> leaf_node_infos_Thrift =
+          footer.getBlocklet_info_list();
+      List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+      for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+        BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i));
+        blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+        blockletInfoList.add(blockletInfo);
+      }
+      dataFileFooter.setBlockletList(blockletInfoList);
+      dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+    } finally {
+      if (null != fileReader) {
+        fileReader.finish();
+      }
+    }
+    return dataFileFooter;
+  }
+
+  /**
+   * Below method will be used to get blocklet index for data file meta
+   *
+   * @param blockletIndexList
+   * @return blocklet index
+   */
+  private BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) {
+    BlockletIndex blockletIndex = new BlockletIndex();
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey());
+    blockletBTreeIndex
+        .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey());
+    blockletIndex.setBtreeIndex(blockletBTreeIndex);
+    byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone();
+    byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone();
+    byte[][] minValue = null;
+    byte[][] maxValue = null;
+    for (int i = 1; i < blockletIndexList.size(); i++) {
+      minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues();
+      maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues();
+      for (int j = 0; j < maxValue.length; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
+          currentMinValue[j] = minValue[j].clone();
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
+          currentMaxValue[j] = maxValue[j].clone();
+        }
+      }
+    }
+
+    BlockletMinMaxIndex minMax = new BlockletMinMaxIndex();
+    minMax.setMaxValues(currentMaxValue);
+    minMax.setMinValues(currentMinValue);
+    blockletIndex.setMinMaxIndex(minMax);
+    return blockletIndex;
+  }
+
+  private ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+    ColumnSchema wrapperColumnSchema = new ColumnSchema();
+    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+    wrapperColumnSchema
+        .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type));
+    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+    List<Encoding> encoders = new ArrayList<Encoding>();
+    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+      encoders.add(fromExternalToWrapperEncoding(encoder));
+    }
+    wrapperColumnSchema.setEncodingList(encoders);
+    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+    wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function());
+    return wrapperColumnSchema;
+  }
+
+  /**
+   * Below method is to convert the blocklet info of the thrift to wrapper
+   * blocklet info
+   *
+   * @param blockletInfoThrift blocklet info of the thrift
+   * @return blocklet info wrapper
+   */
+  private BlockletInfo getBlockletInfo(
+      org.apache.carbondata.format.BlockletInfo blockletInfoThrift) {
+    BlockletInfo blockletInfo = new BlockletInfo();
+    List<DataChunk> dimensionColumnChunk = new ArrayList<DataChunk>();
+    List<DataChunk> measureChunk = new ArrayList<DataChunk>();
+    Iterator<org.apache.carbondata.format.DataChunk> column_data_chunksIterator =
+        blockletInfoThrift.getColumn_data_chunksIterator();
+    if (null != column_data_chunksIterator) {
+      while (column_data_chunksIterator.hasNext()) {
+        org.apache.carbondata.format.DataChunk next = column_data_chunksIterator.next();
+        if (next.isRowMajor()) {
+          dimensionColumnChunk.add(getDataChunk(next, false));
+        } else if (next.getEncoders().contains(org.apache.carbondata.format.Encoding.DELTA)) {
+          measureChunk.add(getDataChunk(next, true));
+        } else {
+          dimensionColumnChunk.add(getDataChunk(next, false));
+        }
+      }
+    }
+    blockletInfo.setDimensionColumnChunk(dimensionColumnChunk);
+    blockletInfo.setMeasureColumnChunk(measureChunk);
+    blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+    return blockletInfo;
+  }
+
+  /**
+   * Below method is convert the thrift encoding to wrapper encoding
+   *
+   * @param encoderThrift thrift encoding
+   * @return wrapper encoding
+   */
+  private Encoding fromExternalToWrapperEncoding(
+      org.apache.carbondata.format.Encoding encoderThrift) {
+    switch (encoderThrift) {
+      case DICTIONARY:
+        return Encoding.DICTIONARY;
+      case DELTA:
+        return Encoding.DELTA;
+      case RLE:
+        return Encoding.RLE;
+      case INVERTED_INDEX:
+        return Encoding.INVERTED_INDEX;
+      case BIT_PACKED:
+        return Encoding.BIT_PACKED;
+      case DIRECT_DICTIONARY:
+        return Encoding.DIRECT_DICTIONARY;
+      default:
+        return Encoding.DICTIONARY;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift compression to wrapper
+   * compression codec
+   *
+   * @param compressionCodecThrift
+   * @return wrapper compression codec
+   */
+  private CompressionCodec getCompressionCodec(
+      org.apache.carbondata.format.CompressionCodec compressionCodecThrift) {
+    switch (compressionCodecThrift) {
+      case SNAPPY:
+        return CompressionCodec.SNAPPY;
+      default:
+        return CompressionCodec.SNAPPY;
+    }
+  }
+
+  /**
+   * Below method will be used to convert thrift segment object to wrapper
+   * segment object
+   *
+   * @param segmentInfo thrift segment info object
+   * @return wrapper segment info object
+   */
+  private SegmentInfo getSegmentInfo(org.apache.carbondata.format.SegmentInfo segmentInfo) {
+    SegmentInfo info = new SegmentInfo();
+    int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()];
+    for (int i = 0; i < cardinality.length; i++) {
+      cardinality[i] = segmentInfo.getColumn_cardinalities().get(i);
+    }
+    info.setColumnCardinality(cardinality);
+    info.setNumberOfColumns(segmentInfo.getNum_cols());
+    return info;
+  }
+
+  /**
+   * Below method will be used to convert the blocklet index of thrift to
+   * wrapper
+   *
+   * @param blockletIndexThrift
+   * @return blocklet index wrapper
+   */
+  private BlockletIndex getBlockletIndex(
+      org.apache.carbondata.format.BlockletIndex blockletIndexThrift) {
+    org.apache.carbondata.format.BlockletBTreeIndex btreeIndex =
+        blockletIndexThrift.getB_tree_index();
+    org.apache.carbondata.format.BlockletMinMaxIndex minMaxIndex =
+        blockletIndexThrift.getMin_max_index();
+    return new BlockletIndex(
+        new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()),
+        new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values()));
+  }
+
+  /**
+   * Below method will be used to convert the thrift compression meta to
+   * wrapper chunk compression meta
+   *
+   * @param chunkCompressionMetaThrift
+   * @return chunkCompressionMetaWrapper
+   */
+  private ChunkCompressorMeta getChunkCompressionMeta(
+      org.apache.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) {
+    ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta();
+    compressorMeta
+        .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec()));
+    compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size());
+    compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size());
+    return compressorMeta;
+  }
+
+  /**
+   * Below method will be used to convert the thrift data type to wrapper data
+   * type
+   *
+   * @param dataTypeThrift
+   * @return dataType wrapper
+   */
+  private DataType thriftDataTyopeToWrapperDataType(
+      org.apache.carbondata.format.DataType dataTypeThrift) {
+    switch (dataTypeThrift) {
+      case STRING:
+        return DataType.STRING;
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DOUBLE:
+        return DataType.DOUBLE;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      case TIMESTAMP:
+        return DataType.TIMESTAMP;
+      case ARRAY:
+        return DataType.ARRAY;
+      case STRUCT:
+        return DataType.STRUCT;
+      default:
+        return DataType.STRING;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  private PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream()));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to convert the thrift object to wrapper object
+   *
+   * @param sortStateThrift
+   * @return wrapper sort state object
+   */
+  private SortState getSortState(org.apache.carbondata.format.SortState sortStateThrift) {
+    if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_EXPLICIT) {
+      return SortState.SORT_EXPLICT;
+    } else if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_NATIVE) {
+      return SortState.SORT_NATIVE;
+    } else {
+      return SortState.SORT_NONE;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift data chunk to wrapper
+   * data chunk
+   *
+   * @param datachunkThrift
+   * @return wrapper data chunk
+   */
+  private DataChunk getDataChunk(org.apache.carbondata.format.DataChunk datachunkThrift,
+      boolean isPresenceMetaPresent) {
+    DataChunk dataChunk = new DataChunk();
+    dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids());
+    dataChunk.setDataPageLength(datachunkThrift.getData_page_length());
+    dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset());
+    if (isPresenceMetaPresent) {
+      dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence()));
+    }
+    dataChunk.setRlePageLength(datachunkThrift.getRle_page_length());
+    dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset());
+    dataChunk.setRowMajor(datachunkThrift.isRowMajor());
+    dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length());
+    dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset());
+    dataChunk.setSortState(getSortState(datachunkThrift.getSort_state()));
+    dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta()));
+    List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size());
+    for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) {
+      encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i)));
+    }
+    dataChunk.setEncoderList(encodingList);
+    if (encodingList.contains(Encoding.DELTA)) {
+      List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta();
+      List<ValueEncoderMeta> encodeMetaList =
+          new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size());
+      for (int i = 0; i < thriftEncoderMeta.size(); i++) {
+        encodeMetaList.add(deserializeEncoderMeta(thriftEncoderMeta.get(i).array()));
+      }
+      dataChunk.setValueEncoderMeta(encodeMetaList);
+    }
+    return dataChunk;
+  }
+
+  /**
+   * Below method will be used to convert the encode metadata to
+   * ValueEncoderMeta object
+   *
+   * @param encoderMeta
+   * @return ValueEncoderMeta object
+   */
+  private ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) {
+    // TODO : should remove the unnecessary fields.
+    ByteArrayInputStream aos = null;
+    ObjectInputStream objStream = null;
+    ValueEncoderMeta meta = null;
+    try {
+      aos = new ByteArrayInputStream(encoderMeta);
+      objStream = new ObjectInputStream(aos);
+      meta = (ValueEncoderMeta) objStream.readObject();
+    } catch (ClassNotFoundException e) {
+      LOGGER.error(e);
+    } catch (IOException e) {
+      CarbonUtil.closeStreams(objStream);
+    }
+    return meta;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
new file mode 100644
index 0000000..a821fb0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import org.apache.commons.lang.NumberUtils;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public final class DataTypeUtil {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataTypeUtil.class.getName());
+
+  /**
+   * This method will convert a given value to its specific type
+   *
+   * @param msrValue
+   * @param dataType
+   * @param carbonMeasure
+   * @return
+   */
+  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
+      CarbonMeasure carbonMeasure) {
+    switch (dataType) {
+      case DECIMAL:
+        BigDecimal bigDecimal =
+            new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
+        return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
+      case INT:
+        return Double.valueOf(msrValue).longValue();
+      case LONG:
+        return Long.valueOf(msrValue);
+      default:
+        return Double.valueOf(msrValue);
+    }
+  }
+
+  /**
+   * This method will check the digits before dot with the max precision allowed
+   *
+   * @param bigDecimal
+   * @param allowedPrecision precision configured by the user
+   * @return
+   */
+  private static BigDecimal normalizeDecimalValue(BigDecimal bigDecimal, int allowedPrecision) {
+    if (bigDecimal.precision() > allowedPrecision) {
+      return null;
+    }
+    return bigDecimal;
+  }
+
+  /**
+   * This method will return the type of measure based on its data type
+   *
+   * @param dataType
+   * @return
+   */
+  public static char getAggType(DataType dataType) {
+    switch (dataType) {
+      case DECIMAL:
+        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
+      case INT:
+      case LONG:
+        return CarbonCommonConstants.BIG_INT_MEASURE;
+      default:
+        return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
+    }
+  }
+
+  /**
+   * This method will convert a big decimal value to bytes
+   *
+   * @param num
+   * @return
+   */
+  public static byte[] bigDecimalToByte(BigDecimal num) {
+    BigInteger sig = new BigInteger(num.unscaledValue().toString());
+    int scale = num.scale();
+    byte[] bscale = new byte[] { (byte) (scale) };
+    byte[] buff = sig.toByteArray();
+    byte[] completeArr = new byte[buff.length + bscale.length];
+    System.arraycopy(bscale, 0, completeArr, 0, bscale.length);
+    System.arraycopy(buff, 0, completeArr, bscale.length, buff.length);
+    return completeArr;
+  }
+
+  /**
+   * This method will convert a byte value back to big decimal value
+   *
+   * @param raw
+   * @return
+   */
+  public static BigDecimal byteToBigDecimal(byte[] raw) {
+    int scale = (raw[0] & 0xFF);
+    byte[] unscale = new byte[raw.length - 1];
+    System.arraycopy(raw, 1, unscale, 0, unscale.length);
+    BigInteger sig = new BigInteger(unscale);
+    return new BigDecimal(sig, scale);
+  }
+
+  /**
+   * returns the SqlStatement.Type of corresponding string value
+   *
+   * @param dataTypeStr
+   * @return return the SqlStatement.Type
+   */
+  public static DataType getDataType(String dataTypeStr) {
+    DataType dataType = null;
+    switch (dataTypeStr) {
+      case "TIMESTAMP":
+        dataType = DataType.TIMESTAMP;
+        break;
+      case "STRING":
+        dataType = DataType.STRING;
+        break;
+      case "INT":
+        dataType = DataType.INT;
+        break;
+      case "SHORT":
+        dataType = DataType.SHORT;
+        break;
+      case "LONG":
+        dataType = DataType.LONG;
+        break;
+      case "DOUBLE":
+        dataType = DataType.DOUBLE;
+        break;
+      case "DECIMAL":
+        dataType = DataType.DECIMAL;
+        break;
+      case "ARRAY":
+        dataType = DataType.ARRAY;
+        break;
+      case "STRUCT":
+        dataType = DataType.STRUCT;
+        break;
+      case "MAP":
+      default:
+        dataType = DataType.STRING;
+    }
+    return dataType;
+  }
+
+  /**
+   * Below method will be used to basically to know whether the input data is valid string of
+   * giving data type. If there is any non parseable string is present return false.
+   */
+  public static boolean isValidData(String data, DataType actualDataType) {
+    if (null == data) {
+      return false;
+    }
+    try {
+      switch (actualDataType) {
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case DECIMAL:
+          return NumberUtils.isNumber(data);
+        case TIMESTAMP:
+          if (data.isEmpty()) {
+            return false;
+          }
+          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+          try {
+            parser.parse(data);
+            return true;
+          } catch (ParseException e) {
+            return false;
+          }
+        default:
+          return true;
+      }
+    } catch (NumberFormatException ex) {
+      return false;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the data passed to its actual data
+   * type
+   *
+   * @param data           data
+   * @param actualDataType actual data type
+   * @return actual data after conversion
+   */
+  public static Object getDataBasedOnDataType(String data, DataType actualDataType) {
+
+    if (null == data || CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case INT:
+          if (data.isEmpty()) {
+            return null;
+          }
+          return Integer.parseInt(data);
+        case SHORT:
+          if (data.isEmpty()) {
+            return null;
+          }
+          return Short.parseShort(data);
+        case DOUBLE:
+          if (data.isEmpty()) {
+            return null;
+          }
+          return Double.parseDouble(data);
+        case LONG:
+          if (data.isEmpty()) {
+            return null;
+          }
+          return Long.parseLong(data);
+        case TIMESTAMP:
+          if (data.isEmpty()) {
+            return null;
+          }
+          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+          Date dateToStr = null;
+          try {
+            dateToStr = parser.parse(data);
+            return dateToStr.getTime() * 1000;
+          } catch (ParseException e) {
+            LOGGER.error("Cannot convert" + data + " to Time/Long type value" + e.getMessage());
+            return null;
+          }
+        case DECIMAL:
+          if (data.isEmpty()) {
+            return null;
+          }
+          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
+          scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
+          org.apache.spark.sql.types.Decimal decConverter =
+              new org.apache.spark.sql.types.Decimal();
+          return decConverter.set(scalaDecVal);
+        default:
+          return UTF8String.fromString(data);
+      }
+    } catch (NumberFormatException ex) {
+      LOGGER.error("Problem while converting data type" + data);
+      return null;
+    }
+
+  }
+
+  public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
+
+    if (null == data) {
+      return null;
+    }
+    try {
+      switch (dataType) {
+        case DOUBLE:
+          return data;
+        case LONG:
+          return data;
+        case DECIMAL:
+          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
+          scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
+          org.apache.spark.sql.types.Decimal decConverter =
+              new org.apache.spark.sql.types.Decimal();
+          return decConverter.set(scalaDecVal);
+        default:
+          return data;
+      }
+    } catch (NumberFormatException ex) {
+      LOGGER.error("Problem while converting data type" + data);
+      return null;
+    }
+
+  }
+
+  /**
+   * Below method will be used to basically to know whether any non parseable
+   * data is present or not. if present then return null so that system can
+   * process to default null member value.
+   *
+   * @param data           data
+   * @param actualDataType actual data type
+   * @return actual data after conversion
+   */
+  public static Object normalizeIntAndLongValues(String data, DataType actualDataType) {
+    if (null == data) {
+      return null;
+    }
+    try {
+      Object parsedValue = null;
+      switch (actualDataType) {
+        case INT:
+          parsedValue = Integer.parseInt(data);
+          break;
+        case LONG:
+          parsedValue = Long.parseLong(data);
+          break;
+        default:
+          return data;
+      }
+      if(null != parsedValue) {
+        return data;
+      }
+      return null;
+    } catch (NumberFormatException ex) {
+      return null;
+    }
+  }
+
+  /**
+   * This method will parse a given string value corresponding to its data type
+   *
+   * @param value     value to parse
+   * @param dimension dimension to get data type and precision and scale in case of decimal
+   *                  data type
+   * @return
+   */
+  public static String normalizeColumnValueForItsDataType(String value, CarbonDimension dimension) {
+    try {
+      Object parsedValue = null;
+      // validation will not be done for timestamp datatype as for timestamp direct dictionary
+      // is generated. No dictionary file is created for timestamp datatype column
+      switch (dimension.getDataType()) {
+        case DECIMAL:
+          return parseStringToBigDecimal(value, dimension);
+        case INT:
+        case LONG:
+          parsedValue = normalizeIntAndLongValues(value, dimension.getDataType());
+          break;
+        case DOUBLE:
+          parsedValue = Double.parseDouble(value);
+          break;
+        default:
+          return value;
+      }
+      if (null != parsedValue) {
+        return value;
+      }
+      return null;
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * This method will parse a value to its datatype if datatype is decimal else will return
+   * the value passed
+   *
+   * @param value     value to be parsed
+   * @param dimension
+   * @return
+   */
+  public static String parseValue(String value, CarbonDimension dimension) {
+    try {
+      switch (dimension.getDataType()) {
+        case DECIMAL:
+          return parseStringToBigDecimal(value, dimension);
+        default:
+          return value;
+      }
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private static String parseStringToBigDecimal(String value, CarbonDimension dimension) {
+    BigDecimal bigDecimal = new BigDecimal(value)
+        .setScale(dimension.getColumnSchema().getScale(), RoundingMode.HALF_UP);
+    BigDecimal normalizedValue =
+        normalizeDecimalValue(bigDecimal, dimension.getColumnSchema().getPrecision());
+    if (null != normalizedValue) {
+      return normalizedValue.toString();
+    }
+    return null;
+  }
+}