You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2020/07/28 13:10:36 UTC

[carbondata] branch master updated: [CARBONDATA-3889] Cleanup duplicated code in carbondata-core module

This is an automated email from the ASF dual-hosted git repository.

manhua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0367516  [CARBONDATA-3889] Cleanup duplicated code in carbondata-core module
0367516 is described below

commit 03675164a0065d7e5f9e584e5556bb74b36c0461
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu Jul 23 17:25:45 2020 +0800

    [CARBONDATA-3889] Cleanup duplicated code in carbondata-core module
    
    Why is this PR needed?
    There are duplicated code in carbondata-core module
    
    What changes were proposed in this PR?
    1.Cleanup duplicated code in carbondata-core module
    2.Fix the typo
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3860
---
 .../core/datastore/block/SegmentProperties.java    |   8 +-
 .../chunk/impl/FixedLengthDimensionColumnPage.java |  57 ++++------
 .../safe/SafeAbstractDimensionDataChunkStore.java  |   6 +-
 .../datastore/filesystem/AlluxioCarbonFile.java    |   2 +-
 .../core/datastore/impl/FileFactory.java           |  18 +---
 .../page/encoding/DefaultEncodingFactory.java      |  12 +--
 .../apache/carbondata/core/index/IndexUtil.java    |  27 +----
 .../core/index/dev/expr/AndIndexExprWrapper.java   |  35 ++----
 .../core/indexstore/BlockletIndexStore.java        |   2 +-
 .../core/indexstore/UnsafeMemoryDMStore.java       |   2 +-
 .../core/indexstore/blockletindex/BlockIndex.java  |   2 +-
 .../core/indexstore/schema/SchemaGenerator.java    |  58 ++++------
 ...java => AbstractDirectDictionaryGenerator.java} | 101 ++++--------------
 .../timestamp/DateDirectDictionaryGenerator.java   |  90 +---------------
 .../TimeStampDirectDictionaryGenerator.java        |  95 +----------------
 .../carbondata/core/keygenerator/mdkey/Bits.java   |  80 --------------
 .../ThriftWrapperSchemaConverterImpl.java          |   4 +-
 .../core/reader/CarbonDeleteFilesDataReader.java   |  46 --------
 .../scan/filter/FilterExpressionProcessor.java     |  54 +---------
 .../carbondata/core/scan/filter/FilterUtil.java    |   2 +-
 .../filter/executer/AndFilterExecutorImpl.java     |   4 +-
 .../filter/executer/ExcludeFilterExecutorImpl.java |  34 +++---
 .../filter/executer/IncludeFilterExecutorImpl.java |   8 +-
 .../executer/MeasureColumnExecutorFilterInfo.java  |   2 +-
 .../scan/filter/resolver/FilterResolverIntf.java   |   4 +-
 .../resolver/RowLevelFilterResolverImpl.java       |   2 +-
 .../resolver/RowLevelRangeFilterResolverImpl.java  |   4 +-
 .../resolverinfo/DimColumnResolvedFilterInfo.java  |   4 +-
 .../resolverinfo/TrueConditionalResolverImpl.java  |   4 +-
 ...apperDirectWithDeleteDeltaAndInvertedIndex.java |   2 +-
 .../scan/scanner/impl/BlockletFilterScanner.java   |  49 ++++-----
 .../core/segmentmeta/SegmentMetaDataInfoStats.java |  25 +----
 .../core/util/AbstractDataFileFooterConverter.java | 117 +++------------------
 .../core/util/CarbonLoadStatisticsImpl.java        | 101 +++++-------------
 .../carbondata/core/util/CarbonProperties.java     |  25 ++---
 .../core/util/DataFileFooterConverter.java         |  15 +--
 .../core/util/DataFileFooterConverterV3.java       |  14 +--
 .../apache/carbondata/core/util/DataTypeUtil.java  | 111 ++++---------------
 .../core/writer/CarbonIndexFileMergeWriter.java    |  37 +++----
 .../core/keygenerator/mdkey/BitsUnitTest.java      |  49 ---------
 40 files changed, 248 insertions(+), 1064 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index fe28a37..2b8a049 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -256,11 +256,11 @@ public class SegmentProperties {
    */
   protected long getFingerPrinter() {
     if (this.fingerPrinter == Long.MAX_VALUE) {
-      long dimensionsFingerPrinter = getFingerprinter(this.dimensions.stream()
+      long dimensionsFingerPrinter = getFingerPrinter(this.dimensions.stream()
               .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
-      long measuresFingerPrinter = getFingerprinter(this.measures.stream()
+      long measuresFingerPrinter = getFingerPrinter(this.measures.stream()
               .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
-      long complexFingerPrinter = getFingerprinter(this.complexDimensions.stream()
+      long complexFingerPrinter = getFingerPrinter(this.complexDimensions.stream()
               .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
       this.fingerPrinter = (dimensionsFingerPrinter >> DIMENSIONS_FINGER_PRINTER_SHIFT)
               ^ (measuresFingerPrinter >> MEASURES_FINGER_PRINTER_SHIFT)
@@ -269,7 +269,7 @@ public class SegmentProperties {
     return this.fingerPrinter;
   }
 
-  private long getFingerprinter(List<ColumnSchema> columns) {
+  private long getFingerPrinter(List<ColumnSchema> columns) {
     int counter = 0;
     ColumnSchema columnSchema = null;
     long fingerprint = Long.MAX_VALUE;
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index 2ccd7c8..b5b5b7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.chunk.impl;
 
+import java.util.function.IntUnaryOperator;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
@@ -106,22 +108,15 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
     return chunkIndex + 1;
   }
 
-  /**
-   * Fill the data to vector
-   *
-   * @param vectorInfo
-   * @param chunkIndex
-   * @return next column index
-   */
-  @Override
-  public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
+  private int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex,
+      IntUnaryOperator function) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = columnVectorInfo.size + offset;
     CarbonColumnVector vector = columnVectorInfo.vector;
     for (int j = offset; j < len; j++) {
-      int dict = dataChunkStore.getSurrogate(j);
+      int dict = dataChunkStore.getSurrogate(function.applyAsInt(j));
       if (columnVectorInfo.directDictionaryGenerator == null) {
         vector.putInt(vectorOffset++, dict);
       } else {
@@ -145,6 +140,19 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
     return chunkIndex + 1;
   }
 
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param chunkIndex
+   * @return next column index
+   */
+  @Override
+  public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
+    return fillVector(vectorInfo, chunkIndex, j -> j);
+  }
+
   /**
    * Fill the data to vector
    *
@@ -156,33 +164,6 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
   @Override
   public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo,
       int chunkIndex) {
-    ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
-    int offset = columnVectorInfo.offset;
-    int vectorOffset = columnVectorInfo.vectorOffset;
-    int len = columnVectorInfo.size + offset;
-    CarbonColumnVector vector = columnVectorInfo.vector;
-    for (int j = offset; j < len; j++) {
-      int dict = dataChunkStore.getSurrogate(filteredRowId[j]);
-      if (columnVectorInfo.directDictionaryGenerator == null) {
-        vector.putInt(vectorOffset++, dict);
-      } else {
-        Object valueFromSurrogate =
-            columnVectorInfo.directDictionaryGenerator.getValueFromSurrogate(dict);
-        if (valueFromSurrogate == null) {
-          vector.putNull(vectorOffset++);
-        } else {
-          DataType dataType = columnVectorInfo.directDictionaryGenerator.getReturnType();
-          if (dataType == DataTypes.INT) {
-            vector.putInt(vectorOffset++, (int) valueFromSurrogate);
-          } else if (dataType == DataTypes.LONG) {
-            vector.putLong(vectorOffset++, (long) valueFromSurrogate);
-          } else {
-            throw new IllegalArgumentException("unsupported data type: " +
-                columnVectorInfo.directDictionaryGenerator.getReturnType());
-          }
-        }
-      }
-    }
-    return chunkIndex + 1;
+    return fillVector(vectorInfo, chunkIndex, j -> filteredRowId[j]);
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractDimensionDataChunkStore.java
index 6725393..1c07b77 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractDimensionDataChunkStore.java
@@ -48,10 +48,10 @@ public abstract class SafeAbstractDimensionDataChunkStore implements DimensionDa
   /**
    * Constructor
    *
-   * @param isInvertedIdex is inverted index present
+   * @param isInvertedIndex is inverted index present
    */
-  public SafeAbstractDimensionDataChunkStore(boolean isInvertedIdex) {
-    this.isExplicitSorted = isInvertedIdex;
+  public SafeAbstractDimensionDataChunkStore(boolean isInvertedIndex) {
+    this.isExplicitSorted = isInvertedIndex;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
index ae2cd7c..efeeba2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
@@ -80,7 +80,7 @@ public class AlluxioCarbonFile extends HDFSCarbonFile {
       }
       return false;
     } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage(), e);
+      LOGGER.error("Exception occurred: " + e.getMessage(), e);
       return false;
     }
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 1233b5f..1e520b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -132,23 +132,7 @@ public final class FileFactory {
   }
 
   private static FileType getFileTypeWithLowerCase(String path) {
-    String lowerCase = path.toLowerCase();
-    if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
-      return FileType.HDFS;
-    } else if (lowerCase.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
-      return FileType.ALLUXIO;
-    } else if (lowerCase.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
-      return FileType.VIEWFS;
-    } else if (lowerCase.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerCase
-        .startsWith(CarbonCommonConstants.S3A_PREFIX) || lowerCase
-        .startsWith(CarbonCommonConstants.S3_PREFIX)) {
-      return FileType.S3;
-    } else if (lowerCase.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) && !configuration
-        .get(CarbonCommonConstants.FS_DEFAULT_FS)
-        .equalsIgnoreCase(CarbonCommonConstants.LOCAL_FS_URI)) {
-      return FileType.HDFS_LOCAL;
-    }
-    return null;
+    return getFileTypeWithActualPath(path.toLowerCase());
   }
 
   private static FileType getFileTypeWithActualPath(String path) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index cdcd8e3..b43f566 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -229,17 +229,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
   }
 
   private static DataType compareMinMaxAndSelectDataType(long value) {
-    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-      return DataTypes.BYTE;
-    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-      return DataTypes.SHORT;
-    } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
-      return DataTypes.SHORT_INT;
-    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-      return DataTypes.INT;
-    } else {
-      return DataTypes.LONG;
-    }
+    return fitLongMinMax(value, value);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 86954b6..bae52a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.core.index;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -246,21 +245,21 @@ public class IndexUtil {
         TableIndex index = IndexStoreManager.getInstance()
             .getIndex(table, wrapper.getDistributable().getIndexSchema());
         List<Index> indices = index.getTableIndexes(wrapper.getDistributable());
-        List<ExtendedBlocklet> prunnedBlocklet = new ArrayList<>();
+        List<ExtendedBlocklet> prunedBlocklet = new ArrayList<>();
         if (table.isTransactionalTable()) {
-          prunnedBlocklet.addAll(index.prune(indices, wrapper.getDistributable(),
+          prunedBlocklet.addAll(index.prune(indices, wrapper.getDistributable(),
               indexExprWrapper.getFilterResolverIntf(wrapper.getUniqueId()), partitions));
         } else {
-          prunnedBlocklet
+          prunedBlocklet
               .addAll(index.prune(segmentsToLoad, new IndexFilter(filterResolverIntf),
                   partitions));
         }
         // For all blocklets initialize the detail info so that it can be serialized to the driver.
-        for (ExtendedBlocklet blocklet : prunnedBlocklet) {
+        for (ExtendedBlocklet blocklet : prunedBlocklet) {
           blocklet.getDetailInfo();
           blocklet.setIndexUniqueId(wrapper.getUniqueId());
         }
-        extendedBlocklets.addAll(prunnedBlocklet);
+        extendedBlocklets.addAll(prunedBlocklet);
       }
       return indexExprWrapper.pruneBlocklets(extendedBlocklets);
     }
@@ -329,20 +328,4 @@ public class IndexUtil {
     }
     return segmentList;
   }
-
-  public static String getMaxSegmentID(List<String> segmentList) {
-    double[] segment = new double[segmentList.size()];
-    int i = 0;
-    for (String id : segmentList) {
-      segment[i] = Double.parseDouble(id);
-      i++;
-    }
-    Arrays.sort(segment);
-    String maxId = Double.toString(segment[segmentList.size() - 1]);
-    if (maxId.endsWith(".0")) {
-      maxId = maxId.substring(0, maxId.indexOf("."));
-    }
-    return maxId;
-  }
-
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/dev/expr/AndIndexExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/index/dev/expr/AndIndexExprWrapper.java
index a29734a..a3b5167 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/dev/expr/AndIndexExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/dev/expr/AndIndexExprWrapper.java
@@ -47,39 +47,26 @@ public class AndIndexExprWrapper extends IndexExprWrapper {
   }
 
   @Override
-  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
-      throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
-    for (ExtendedBlocklet blocklet : leftPrune) {
-      if (rightPrune.contains(blocklet)) {
-        andBlocklets.add(blocklet);
-      }
-    }
-    return andBlocklets;
+  public List<ExtendedBlocklet> prune(List<Segment> segments,
+      List<PartitionSpec> partitionsToPrune) throws IOException {
+    return and(left.prune(segments, partitionsToPrune), right.prune(segments, partitionsToPrune));
   }
 
   @Override
   public List<ExtendedBlocklet> prune(IndexInputSplit distributable,
-      List<PartitionSpec> partitionsToPrune)
-          throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune);
-    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
-    for (ExtendedBlocklet blocklet : leftPrune) {
-      if (rightPrune.contains(blocklet)) {
-        andBlocklets.add(blocklet);
-      }
-    }
-    return andBlocklets;
+      List<PartitionSpec> partitionsToPrune) throws IOException {
+    return and(left.prune(distributable, partitionsToPrune),
+        right.prune(distributable, partitionsToPrune));
   }
 
   @Override
   public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
       throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
-    List<ExtendedBlocklet> rightPrune = right.pruneBlocklets(blocklets);
+    return and(left.pruneBlocklets(blocklets), right.pruneBlocklets(blocklets));
+  }
+
+  private List<ExtendedBlocklet> and(List<ExtendedBlocklet> leftPrune,
+      List<ExtendedBlocklet> rightPrune) {
     List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
     for (ExtendedBlocklet blocklet : leftPrune) {
       if (rightPrune.contains(blocklet)) {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
index 7e17673..a4c7102 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
@@ -281,7 +281,7 @@ public class BlockletIndexStore
    * based on task id and will return the map of taskId to table segment
    * map
    *
-   * @return map of taks id to segment mapping
+   * @return map of task id to segment mapping
    * @throws IOException
    */
   private BlockIndex loadAndGetIndex(TableBlockIndexUniqueIdentifier identifier,
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 5e0f579..a8af285 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -101,7 +101,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
    *
    * VD: Read based on below logic
    * if not last variable column schema
-   * X = read actual variable column offset based on byte postion added in CarbonRowSchema
+   * X = read actual variable column offset based on byte position added in CarbonRowSchema
    * Y = read next variable column offset (next 4 bytes)
    * get the length
    * len  = (X-Y)
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 6126636..18c37ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -525,7 +525,7 @@ public class BlockIndex extends CoarseGrainIndex
    * @param minMaxValueCompare2
    * @param isMinValueComparison
    */
-  private byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1,
+  public static byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1,
       byte[][] minMaxValueCompare2, boolean isMinValueComparison) {
     // Compare and update min max values
     byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][];
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index 8527101..f092982 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -32,13 +32,10 @@ import org.apache.carbondata.core.util.BlockletIndexUtil;
 public class SchemaGenerator {
 
   /**
-   * Method for creating blocklet Schema. Each blocklet row will share the same schema
-   *
-   * @param segmentProperties
-   * @return
+   * creating blocklet/block Schema. Each blocklet row will share the same schema
    */
-  public static CarbonRowSchema[] createBlockSchema(SegmentProperties segmentProperties,
-      List<CarbonColumn> minMaxCacheColumns) {
+  private static CarbonRowSchema[] createSchema(SegmentProperties segmentProperties,
+      List<CarbonColumn> minMaxCacheColumns, boolean isBlocklet) {
     List<CarbonRowSchema> indexSchemas = new ArrayList<>();
     // get MinMax Schema
     getMinMaxSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
@@ -59,12 +56,31 @@ public class SchemaGenerator {
     // for storing min max flag for each column which reflects whether min max for a column is
     // written in the metadata or not.
     addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
+    if (isBlocklet) {
+      //for blocklet info
+      indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+      // for number of pages.
+      indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+      // for relative blocklet id i.e. blocklet id that belongs to a particular part file
+      indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+    }
     CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
     updateBytePosition(schema);
     return schema;
   }
 
   /**
+   * Method for creating blocklet Schema. Each blocklet row will share the same schema
+   *
+   * @param segmentProperties
+   * @return
+   */
+  public static CarbonRowSchema[] createBlockSchema(SegmentProperties segmentProperties,
+      List<CarbonColumn> minMaxCacheColumns) {
+    return createSchema(segmentProperties, minMaxCacheColumns, false);
+  }
+
+  /**
    * Method to update the byte position which will be used in case of unsafe dm store
    * @see org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java:87
    *
@@ -139,35 +155,7 @@ public class SchemaGenerator {
    */
   public static CarbonRowSchema[] createBlockletSchema(SegmentProperties segmentProperties,
       List<CarbonColumn> minMaxCacheColumns) {
-    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
-    // get MinMax Schema
-    getMinMaxSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
-    // for number of rows.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
-    // for table block path
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-    // for version number.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-    // for schema updated time.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-    // for block footer offset.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-    // for locations
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-    // for storing block length.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-    // for storing min max flag for each column which reflects whether min max for a column is
-    // written in the metadata or not.
-    addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
-    //for blocklet info
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-    // for number of pages.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-    // for relative blocklet id i.e. blocklet id that belongs to a particular part file
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
-    updateBytePosition(schema);
-    return schema;
+    return createSchema(segmentProperties, minMaxCacheColumns, true);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/AbstractDirectDictionaryGenerator.java
similarity index 60%
copy from core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
copy to core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/AbstractDirectDictionaryGenerator.java
index 2325f01..d98cac2 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/AbstractDirectDictionaryGenerator.java
@@ -20,64 +20,35 @@ package org.apache.carbondata.core.keygenerator.directdictionary.timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.TimeZone;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import org.apache.log4j.Logger;
 
-/**
- * The class provides the method to generate dictionary key and getting the actual value from
- * the dictionaryKey for direct dictionary column for TIMESTAMP type.
- */
-public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
-
-  public static final int cutOffDate = Integer.MAX_VALUE >> 1;
-  private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
-  public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
+public abstract class AbstractDirectDictionaryGenerator implements DirectDictionaryGenerator {
 
-  private ThreadLocal<SimpleDateFormat> simpleDateFormatLocal = new ThreadLocal<>();
+  protected ThreadLocal<SimpleDateFormat> simpleDateFormatLocal = new ThreadLocal<>();
 
-  private String dateFormat;
+  protected String dateFormat;
 
-  /**
-   * min value supported for date type column
-   */
-  public static final long MIN_VALUE;
-  /**
-   * MAx value supported for date type column
-   */
-  public static final long MAX_VALUE;
-  /**
-   * Logger instance
-   */
   private static final Logger LOGGER =
-      LogServiceFactory.getLogService(DateDirectDictionaryGenerator.class.getName());
-
-  static {
-    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
-    df.setTimeZone(TimeZone.getTimeZone("GMT"));
-    long minValue = 0;
-    long maxValue = 0;
-    try {
-      minValue = df.parse("0001-01-01").getTime();
-      maxValue = df.parse("9999-12-31").getTime();
-    } catch (ParseException e) {
-      // the Exception will not occur as constant value is being parsed
-    }
-    MIN_VALUE = minValue;
-    MAX_VALUE = maxValue;
-  }
+      LogServiceFactory.getLogService(AbstractDirectDictionaryGenerator.class.getName());
 
-  public DateDirectDictionaryGenerator(String dateFormat) {
+  public AbstractDirectDictionaryGenerator(String dateFormat) {
     this.dateFormat = dateFormat;
     initialize();
   }
 
+  @Override
+  public void initialize() {
+    if (simpleDateFormatLocal.get() == null) {
+      simpleDateFormatLocal.set(new SimpleDateFormat(dateFormat));
+      simpleDateFormatLocal.get().setLenient(false);
+    }
+  }
+
   /**
    * The method take member String as input and converts
    * and returns the dictionary key
@@ -114,7 +85,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
   }
 
   private int getDirectSurrogateForMember(String memberStr) {
-    Date dateToStr = null;
+    Date dateToStr;
     try {
       SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
       if (null == simpleDateFormat) {
@@ -137,28 +108,15 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     }
   }
 
-  /**
-   * The method take dictionary key as input and returns the
-   *
-   * @param key
-   * @return member value/actual value Date
-   */
-  @Override
-  public Object getValueFromSurrogate(int key) {
-    if (key == CarbonCommonConstants.DIRECT_DICT_VALUE_NULL) {
-      return null;
-    }
-    return key - cutOffDate;
-  }
-
   private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
     long timeValue = -1;
     try {
       timeValue = Long.parseLong(memberStr) / 1000;
     } catch (NumberFormatException e) {
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Cannot convert value to Long type value. Value considered as null."
-            + e.getMessage(), e);
+        LOGGER.debug(
+            "Cannot convert " + memberStr + " Long type value. Value considered as null." + e
+                .getMessage());
       }
     }
     if (timeValue == -1) {
@@ -167,27 +125,4 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
       return generateKey(timeValue);
     }
   }
-
-  public int generateKey(long timeValue) {
-    if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Value for date type column is not in valid range. Value considered as null.");
-      }
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    }
-    return (int) Math.floor((double) timeValue / MILLIS_PER_DAY) + cutOffDate;
-  }
-
-  public void initialize() {
-    if (simpleDateFormatLocal.get() == null) {
-      simpleDateFormatLocal.set(new SimpleDateFormat(dateFormat));
-      simpleDateFormatLocal.get().setLenient(false);
-      simpleDateFormatLocal.get().setTimeZone(TimeZone.getTimeZone("GMT"));
-    }
-  }
-
-  @Override
-  public DataType getReturnType() {
-    return DataTypes.INT;
-  }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index 2325f01..c836c02 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -19,12 +19,10 @@ package org.apache.carbondata.core.keygenerator.directdictionary.timestamp;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.TimeZone;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
@@ -34,16 +32,11 @@ import org.apache.log4j.Logger;
  * The class provides the method to generate dictionary key and getting the actual value from
  * the dictionaryKey for direct dictionary column for TIMESTAMP type.
  */
-public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
+public class DateDirectDictionaryGenerator extends AbstractDirectDictionaryGenerator {
 
   public static final int cutOffDate = Integer.MAX_VALUE >> 1;
   private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
   public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
-
-  private ThreadLocal<SimpleDateFormat> simpleDateFormatLocal = new ThreadLocal<>();
-
-  private String dateFormat;
-
   /**
    * min value supported for date type column
    */
@@ -74,67 +67,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
   }
 
   public DateDirectDictionaryGenerator(String dateFormat) {
-    this.dateFormat = dateFormat;
-    initialize();
-  }
-
-  /**
-   * The method take member String as input and converts
-   * and returns the dictionary key
-   *
-   * @param memberStr date format string
-   * @return dictionary value
-   */
-  @Override
-  public int generateDirectSurrogateKey(String memberStr) {
-    if (null == memberStr || memberStr.trim().isEmpty() || memberStr
-        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    }
-    return getDirectSurrogateForMember(memberStr);
-  }
-
-  /**
-   * The method take member String as input and converts
-   * and returns the dictionary key
-   *
-   * @param memberStr date format string
-   * @return dictionary value
-   */
-  public int generateDirectSurrogateKey(String memberStr, String format) {
-    if (null == format) {
-      return generateDirectSurrogateKeyForNonTimestampType(memberStr);
-    } else {
-      if (null == memberStr || memberStr.trim().isEmpty() || memberStr
-          .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-        return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-      }
-      return getDirectSurrogateForMember(memberStr);
-    }
-  }
-
-  private int getDirectSurrogateForMember(String memberStr) {
-    Date dateToStr = null;
-    try {
-      SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
-      if (null == simpleDateFormat) {
-        initialize();
-        simpleDateFormat = simpleDateFormatLocal.get();
-      }
-      dateToStr = simpleDateFormat.parse(memberStr);
-    } catch (ParseException e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Cannot convert value to Time/Long type value. Value considered as null." + e
-            .getMessage());
-      }
-      dateToStr = null;
-    }
-    //adding +2 to reserve the first cutOffDiff value for null or empty date
-    if (null == dateToStr) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    } else {
-      return generateKey(dateToStr.getTime());
-    }
+    super(dateFormat);
   }
 
   /**
@@ -151,23 +84,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     return key - cutOffDate;
   }
 
-  private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
-    long timeValue = -1;
-    try {
-      timeValue = Long.parseLong(memberStr) / 1000;
-    } catch (NumberFormatException e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Cannot convert value to Long type value. Value considered as null."
-            + e.getMessage(), e);
-      }
-    }
-    if (timeValue == -1) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    } else {
-      return generateKey(timeValue);
-    }
-  }
-
+  @Override
   public int generateKey(long timeValue) {
     if (timeValue < MIN_VALUE || timeValue > MAX_VALUE) {
       if (LOGGER.isDebugEnabled()) {
@@ -178,6 +95,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     return (int) Math.floor((double) timeValue / MILLIS_PER_DAY) + cutOffDate;
   }
 
+  @Override
   public void initialize() {
     if (simpleDateFormatLocal.get() == null) {
       simpleDateFormatLocal.set(new SimpleDateFormat(dateFormat));
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index e1927e4..905acc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -23,7 +23,6 @@ import java.util.Date;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -39,12 +38,7 @@ import org.apache.log4j.Logger;
  * The class provides the method to generate dictionary key and getting the actual value from
  * the dictionaryKey for direct dictionary column for TIMESTAMP type.
  */
-public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGenerator {
-
-  private ThreadLocal<SimpleDateFormat> simpleDateFormatLocal = new ThreadLocal<>();
-
-  private String dateFormat;
-
+public class TimeStampDirectDictionaryGenerator extends AbstractDirectDictionaryGenerator {
   /**
    * The value of 1 unit of the SECOND, MINUTE, HOUR, or DAY in millis.
    */
@@ -109,8 +103,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
   }
 
   public TimeStampDirectDictionaryGenerator(String dateFormat) {
-    this.dateFormat = dateFormat;
-    initialize();
+    super(dateFormat);
   }
 
   public TimeStampDirectDictionaryGenerator() {
@@ -119,65 +112,6 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
   }
 
   /**
-   * The method take member String as input and converts
-   * and returns the dictionary key
-   *
-   * @param memberStr date format string
-   * @return dictionary value
-   */
-  @Override
-  public int generateDirectSurrogateKey(String memberStr) {
-    if (null == memberStr || memberStr.trim().isEmpty() || memberStr
-        .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    }
-    return getDirectSurrogateForMember(memberStr);
-  }
-
-  /**
-   * The method take member String as input and converts
-   * and returns the dictionary key
-   *
-   * @param memberStr date format string
-   * @return dictionary value
-   */
-  public int generateDirectSurrogateKey(String memberStr, String format) {
-    if (null == format) {
-      return generateDirectSurrogateKeyForNonTimestampType(memberStr);
-    } else {
-      if (null == memberStr || memberStr.trim().isEmpty() || memberStr
-          .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-        return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-      }
-      return getDirectSurrogateForMember(memberStr);
-    }
-  }
-
-  private int getDirectSurrogateForMember(String memberStr) {
-    Date dateToStr = null;
-    try {
-      SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
-      if (null == simpleDateFormat) {
-        initialize();
-        simpleDateFormat = simpleDateFormatLocal.get();
-      }
-      dateToStr = simpleDateFormat.parse(memberStr);
-    } catch (ParseException e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Cannot convert value to Time/Long type value. Value considered as null." + e
-            .getMessage());
-      }
-      dateToStr = null;
-    }
-    //adding +2 to reserve the first cutOffDiff value for null or empty date
-    if (null == dateToStr) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    } else {
-      return generateKey(dateToStr.getTime());
-    }
-  }
-
-  /**
    * The method take dictionary key as input and returns the
    *
    * @param key
@@ -192,24 +126,6 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     return timeStamp * 1000L;
   }
 
-  private int generateDirectSurrogateKeyForNonTimestampType(String memberStr) {
-    long timeValue = -1;
-    try {
-      timeValue = Long.parseLong(memberStr) / 1000;
-    } catch (NumberFormatException e) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Cannot convert " + memberStr + " Long type value. Value considered as null." + e
-                .getMessage());
-      }
-    }
-    if (timeValue == -1) {
-      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL;
-    } else {
-      return generateKey(timeValue);
-    }
-  }
-
   public int generateKey(long timeValue) {
     long time = (timeValue - cutOffTimeStamp) / granularityFactor;
     int keyValue = -1;
@@ -219,13 +135,6 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     return keyValue < 0 ? CarbonCommonConstants.DIRECT_DICT_VALUE_NULL : keyValue + 2;
   }
 
-  public void initialize() {
-    if (simpleDateFormatLocal.get() == null) {
-      simpleDateFormatLocal.set(new SimpleDateFormat(dateFormat));
-      simpleDateFormatLocal.get().setLenient(false);
-    }
-  }
-
   @Override
   public DataType getReturnType() {
     return DataTypes.LONG;
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
index 548292f..263de8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/mdkey/Bits.java
@@ -115,43 +115,6 @@ public class Bits implements Serializable {
     return new int[] { start, end };
   }
 
-  protected long[] get(long[] keys) {
-    long[] words = new long[wSize];
-    int ll = 0;
-    int minLength = Math.min(lens.length, keys.length);
-    for (int i = minLength - 1; i >= 0; i--) {
-
-      long val = keys[i];
-
-      int idx = ll >> 6; // divide by 64 to get the new word index
-      int position = ll & 0x3f; // to ignore sign bit and consider the remaining
-      val = val & (LONG_MAX >> (MAX_LENGTH - lens[i])); // To control the
-      // logic so that
-      // any val do not
-      // exceed the
-      // cardinality
-      long mask = (val << position);
-      long word = words[idx];
-      words[idx] = (word | mask);
-      ll += lens[i];
-
-      int nextIndex = ll >> 6; // This is divide by 64
-
-      if (nextIndex != idx) {
-        int consideredBits = lens[i] - ll & 0x3f;
-        //Check for spill over only if all the bits are not considered
-        if (consideredBits < lens[i]) {
-          mask = (val >> (lens[i] - ll & 0x3f)); //& (0x7fffffffffffffffL >> (0x3f-pos));
-          word = words[nextIndex];
-          words[nextIndex] = (word | mask);
-        }
-      }
-
-    }
-
-    return words;
-  }
-
   protected long[] get(int[] keys) {
     long[] words = new long[wSize];
     int ll = 0;
@@ -218,13 +181,6 @@ public class Bits implements Serializable {
     return values;
   }
 
-  public byte[] getBytes(long[] keys) {
-
-    long[] words = get(keys);
-
-    return getBytesVal(words);
-  }
-
   private byte[] getBytesVal(long[] words) {
     int length = 8;
     byte[] bytes = new byte[byteSize];
@@ -274,42 +230,6 @@ public class Bits implements Serializable {
     }
 
     return getArray(words);
-
-  }
-
-  public long[] getKeyArray(byte[] key, int[] maskByteRanges) {
-
-    int length = 8;
-    int ls = byteSize;
-    long[] words = new long[wSize];
-    for (int i = 0; i < words.length; i++) {
-      long l = 0;
-      ls -= 8;
-      int m2 = 0;
-      if (ls < 0) {
-        m2 = ls + length;
-        ls = 0;
-      } else {
-        m2 = ls + 8;
-      }
-      if (maskByteRanges == null) {
-        for (int j = ls; j < m2; j++) {
-          l <<= 8;
-          l ^= key[j] & 0xFF;
-        }
-      } else {
-        for (int j = ls; j < m2; j++) {
-          l <<= 8;
-          if (maskByteRanges[j] != -1) {
-            l ^= key[maskByteRanges[j]] & 0xFF;
-          }
-        }
-      }
-      words[i] = l;
-    }
-
-    return getArray(words);
-
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index fa2cf50..6c8e0ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -264,7 +264,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
   }
 
   /* (non-Javadoc)
-   * convert from wrapper to external tableschema
+   * convert from wrapper to external table schema
    */
   @Override
   public org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema(
@@ -304,7 +304,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
   }
 
   /* (non-Javadoc)
-   * convert from wrapper to external tableinfo
+   * convert from wrapper to external table info
    */
   @Override
   public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index e05e581..b10723b 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -30,7 +29,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
@@ -71,50 +69,6 @@ public class CarbonDeleteFilesDataReader {
   }
 
   /**
-   * Returns all deleted records from all specified delta files
-   *
-   * @param deltaFiles
-   * @return
-   * @throws Exception
-   */
-  public Map<Integer, Integer[]> getDeleteDataFromAllFiles(List<String> deltaFiles,
-      String blockletId) throws Exception {
-
-    List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>();
-    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
-    for (final String deltaFile : deltaFiles) {
-      taskSubmitList.add(executorService.submit(new DeleteDeltaFileReaderCallable(deltaFile)));
-    }
-    try {
-      executorService.shutdown();
-      executorService.awaitTermination(30, TimeUnit.MINUTES);
-    } catch (InterruptedException e) {
-      LOGGER.error("Error while reading the delete delta files : " + e.getMessage(), e);
-    }
-
-    Map<Integer, Integer[]> pageIdDeleteRowsMap =
-        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (int i = 0; i < taskSubmitList.size(); i++) {
-      try {
-        List<DeleteDeltaBlockletDetails> blockletDetails =
-            taskSubmitList.get(i).get().getBlockletDetails();
-        for (DeleteDeltaBlockletDetails eachBlockletDetails : blockletDetails) {
-          Integer pageId = eachBlockletDetails.getPageId();
-          Set<Integer> rows = blockletDetails
-              .get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId, pageId)))
-              .getDeletedRows();
-          pageIdDeleteRowsMap.put(pageId, rows.toArray(new Integer[rows.size()]));
-        }
-
-      } catch (Throwable e) {
-        LOGGER.error(e.getMessage(), e);
-        throw new Exception(e);
-      }
-    }
-    return pageIdDeleteRowsMap;
-  }
-
-  /**
    * Below method will be used to read the delete delta files
    * and get the map of blockletId and page id mapping to deleted
    * rows
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 0c1ab05..328388a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.scan.filter;
 
 import java.util.BitSet;
 
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -47,13 +46,7 @@ import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolv
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.FalseConditionalResolverImpl;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
 
-import org.apache.log4j.Logger;
-
 public class FilterExpressionProcessor implements FilterProcessor {
-
-  private static final Logger LOGGER =
-      LogServiceFactory.getLogService(FilterExpressionProcessor.class.getName());
-
   /**
    * Implementation will provide the resolved form of filters based on the
    * filter expression tree which is been passed in Expression instance.
@@ -202,27 +195,6 @@ public class FilterExpressionProcessor implements FilterProcessor {
 
         CarbonColumn column = currentCondExpression.getColumnList().get(0).getCarbonColumn();
         if (currentCondExpression.isSingleColumn() && !column.getDataType().isComplexType()) {
-          if (column.isMeasure()) {
-            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
-                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
-                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
-                    || FilterUtil
-                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
-              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, true,
-                  tableIdentifier);
-            }
-            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
-                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
-                || currentCondExpression.getFilterExpressionType()
-                == ExpressionType.GREATERTHAN_EQUALTO
-                || currentCondExpression.getFilterExpressionType()
-                == ExpressionType.LESSTHAN_EQUALTO) {
-              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, true,
-                  tableIdentifier);
-            }
-            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-                currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
-          }
           // In case of Range Column Dictionary Include we do not need to resolve the range
           // expression as it is already resolved and has the surrogates in the filter value
           if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
@@ -243,7 +215,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
                 tableIdentifier);
           }
           return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-              currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
+              column.isMeasure());
 
         }
         break;
@@ -253,27 +225,6 @@ public class FilterExpressionProcessor implements FilterProcessor {
         currentCondExpression = (BinaryConditionalExpression) expression;
         column = currentCondExpression.getColumnList().get(0).getCarbonColumn();
         if (currentCondExpression.isSingleColumn() && !column.getDataType().isComplexType()) {
-          if (column.isMeasure()) {
-            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
-                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
-                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
-                    || FilterUtil
-                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
-              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, false,
-                  tableIdentifier);
-            }
-            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
-                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
-                || currentCondExpression.getFilterExpressionType()
-                == ExpressionType.GREATERTHAN_EQUALTO
-                || currentCondExpression.getFilterExpressionType()
-                == ExpressionType.LESSTHAN_EQUALTO) {
-              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, false,
-                  tableIdentifier);
-            }
-            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false, true);
-          }
-
           if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
               && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
               FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
@@ -291,7 +242,8 @@ public class FilterExpressionProcessor implements FilterProcessor {
                 tableIdentifier);
           }
 
-          return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false, false);
+          return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
+              column.isMeasure());
         }
         break;
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 3053d91..c6c3563 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -112,7 +112,7 @@ public final class FilterUtil {
    * @param segmentProperties
    * @param complexDimensionInfoMap
    * @param minMaxCacheColumns
-   * @param isStreamDataFile: whether create filter executer tree for stream data files
+   * @param isStreamDataFile: whether create filter executor tree for stream data files
    * @return FilterExecutor instance
    *
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecutorImpl.java
index cca4231..dd51af3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecutorImpl.java
@@ -30,9 +30,9 @@ public class AndFilterExecutorImpl implements FilterExecutor, ImplicitColumnFilt
   private FilterExecutor leftExecutor;
   private FilterExecutor rightExecutor;
 
-  public AndFilterExecutorImpl(FilterExecutor leftExecutor, FilterExecutor rightExecuter) {
+  public AndFilterExecutorImpl(FilterExecutor leftExecutor, FilterExecutor rightExecutor) {
     this.leftExecutor = leftExecutor;
-    this.rightExecutor = rightExecuter;
+    this.rightExecutor = rightExecutor;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecutorImpl.java
index 845245c..2675042 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecutorImpl.java
@@ -43,7 +43,7 @@ import org.apache.carbondata.core.util.comparator.SerializableComparator;
 public class ExcludeFilterExecutorImpl implements FilterExecutor {
 
   private DimColumnResolvedFilterInfo dimColEvaluatorInfo;
-  private DimColumnExecutorFilterInfo dimColumnExecuterInfo;
+  private DimColumnExecutorFilterInfo dimColumnExecutorInfo;
   private MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo;
   private MeasureColumnExecutorFilterInfo msrColumnExecutorInfo;
   protected SegmentProperties segmentProperties;
@@ -74,10 +74,10 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
     this.segmentProperties = segmentProperties;
     if (!isMeasure) {
       this.dimColEvaluatorInfo = dimColEvaluatorInfo;
-      dimColumnExecuterInfo = new DimColumnExecutorFilterInfo();
+      dimColumnExecutorInfo = new DimColumnExecutorFilterInfo();
 
       FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), segmentProperties,
-          dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo, null, null);
+          dimColEvaluatorInfo.getDimension(), dimColumnExecutorInfo, null, null);
       isDimensionPresentInCurrentBlock = true;
       isNaturalSorted =
           dimColEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColEvaluatorInfo
@@ -113,7 +113,7 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
           dimensionRawColumnChunk.decodeAllColumnPages();
       filterValues = FilterUtil
           .getEncodedFilterValues(dimensionRawColumnChunk.getLocalDictionary(),
-              dimColumnExecuterInfo.filterKeysForExclude);
+              dimColumnExecutorInfo.filterKeysForExclude);
       BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
       for (int i = 0; i < dimensionColumnPages.length; i++) {
         BitSet bitSet = getFilteredIndexes(dimensionColumnPages[i],
@@ -164,7 +164,7 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
   @Override
   public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
     if (isDimensionPresentInCurrentBlock) {
-      byte[][] filterValues = dimColumnExecuterInfo.getExcludeFilterKeys();
+      byte[][] filterValues = dimColumnExecutorInfo.getExcludeFilterKeys();
       byte[] col = (byte[])value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
       for (int i = 0; i < filterValues.length; i++) {
         if (0 == ByteUtil.UnsafeComparer.INSTANCE.compareTo(col, 0, col.length,
@@ -344,19 +344,19 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
   }
 
   private BitSet setFilteredIndexToBitSetWithColumnIndex(
-      DimensionColumnPage dimensionColumnPage, int numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    bitSet.flip(0, numerOfRows);
+      DimensionColumnPage dimensionColumnPage, int numberOfRows) {
+    BitSet bitSet = new BitSet(numberOfRows);
+    bitSet.flip(0, numberOfRows);
     if (filterValues.length == 0) {
       return bitSet;
     }
     int startIndex = 0;
     for (int i = 0; i < filterValues.length; i++) {
-      if (startIndex >= numerOfRows) {
+      if (startIndex >= numberOfRows) {
         break;
       }
       int[] rangeIndex = CarbonUtil
-          .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
+          .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numberOfRows - 1,
               filterValues[i]);
       for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
         bitSet.flip(dimensionColumnPage.getInvertedIndex(j));
@@ -369,9 +369,9 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
   }
 
   private BitSet setFilteredIndexToBitSet(DimensionColumnPage dimensionColumnPage,
-      int numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    bitSet.flip(0, numerOfRows);
+      int numberOfRows) {
+    BitSet bitSet = new BitSet(numberOfRows);
+    bitSet.flip(0, numberOfRows);
     // filterValues can be null when the dictionary chunk and surrogate size both are one
     if (filterValues.length == 0) {
       return bitSet;
@@ -380,11 +380,11 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
     if (isNaturalSorted && dimensionColumnPage.isExplicitSorted()) {
       int startIndex = 0;
       for (int i = 0; i < filterValues.length; i++) {
-        if (startIndex >= numerOfRows) {
+        if (startIndex >= numberOfRows) {
           break;
         }
         int[] rangeIndex = CarbonUtil
-            .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
+            .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numberOfRows - 1,
                 filterValues[i]);
         for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
           bitSet.flip(j);
@@ -395,7 +395,7 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
       }
     } else {
       if (filterValues.length > 1) {
-        for (int i = 0; i < numerOfRows; i++) {
+        for (int i = 0; i < numberOfRows; i++) {
           int index = CarbonUtil.binarySearch(filterValues, 0, filterValues.length - 1,
               dimensionColumnPage, i);
           if (index >= 0) {
@@ -403,7 +403,7 @@ public class ExcludeFilterExecutorImpl implements FilterExecutor {
           }
         }
       } else {
-        for (int j = 0; j < numerOfRows; j++) {
+        for (int j = 0; j < numberOfRows; j++) {
           if (dimensionColumnPage.compareTo(j, filterValues[0]) == 0) {
             bitSet.flip(j);
           }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecutorImpl.java
index 7ab5716..96517dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecutorImpl.java
@@ -421,18 +421,18 @@ public class IncludeFilterExecutorImpl implements FilterExecutor {
   }
 
   private BitSet setFilteredIndexToBitSetWithColumnIndex(
-      DimensionColumnPage dimensionColumnPage, int numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
+      DimensionColumnPage dimensionColumnPage, int numberOfRows) {
+    BitSet bitSet = new BitSet(numberOfRows);
     if (filterValues.length == 0) {
       return bitSet;
     }
     int startIndex = 0;
     for (int i = 0; i < filterValues.length; i++) {
-      if (startIndex >= numerOfRows) {
+      if (startIndex >= numberOfRows) {
         break;
       }
       int[] rangeIndex = CarbonUtil
-          .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
+          .getRangeIndexUsingBinarySearch(dimensionColumnPage, startIndex, numberOfRows - 1,
               filterValues[i]);
       for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
         bitSet.set(dimensionColumnPage.getInvertedIndex(j));
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecutorFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecutorFilterInfo.java
index 61b9836..97be175 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecutorFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecutorFilterInfo.java
@@ -35,7 +35,7 @@ import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
  * Below class will be used to keep all the filter values based on data type
  * for measure column.
  * In this class there are multiple type of set is used to avoid conversion of
- * primitive type to primitive object to avoid gc which cause performace degrade when
+ * primitive type to primitive object to avoid gc which cause performance degrade when
  * number of records are high
  */
 public class MeasureColumnExecutorFilterInfo {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/FilterResolverIntf.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/FilterResolverIntf.java
index af33330..6453b56 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/FilterResolverIntf.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/FilterResolverIntf.java
@@ -69,10 +69,10 @@ public interface FilterResolverIntf extends Serializable {
   MeasureColumnResolvedFilterInfo getMsrColResolvedFilterInfo();
 
   /**
-   * API will return the filter executer type which will be used to evaluate
+   * API will return the filter executor type which will be used to evaluate
    * the resolved filter while query execution
    *
-   * @return FilterExecuterType.
+   * @return FilterExecutorType.
    */
   FilterExecutorType getFilterExecutorType();
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
index ded0385..3672f2d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
@@ -81,7 +81,7 @@ public class RowLevelFilterResolverImpl extends ConditionalFilterResolverImpl {
   }
 
   /**
-   * This method will provide the executer type to the callee inorder to identify
+   * This method will provide the executor type to the callee inorder to identify
    * the executer type for the filter resolution, Row level filter executer is a
    * special executer since it get all the rows of the specified filter dimension
    * and will be send to the spark for processing
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 16bd417..76bfb33 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -317,8 +317,8 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
 
   /**
    * This method will provide the executor type to the callee inorder to identify
-   * the executer type for the filter resolution, Row level filter executer is a
-   * special executer since it get all the rows of the specified filter dimension
+   * the executor type for the filter resolution, Row level filter executer is a
+   * special executor since it get all the rows of the specified filter dimension
    * and will be send to the spark for processing
    */
   public FilterExecutorType getFilterExecutorType() {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java
index 2d53e5c..532229a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java
@@ -108,8 +108,8 @@ public class DimColumnResolvedFilterInfo extends ColumnResolvedFilterInfo implem
     return isDimensionExistsInCurrentSlice;
   }
 
-  public void setDimensionExistsInCurrentSlice(boolean isDimensionExistsInCurrentSilce) {
-    this.isDimensionExistsInCurrentSlice = isDimensionExistsInCurrentSilce;
+  public void setDimensionExistsInCurrentSlice(boolean isDimensionExistsInCurrentSlice) {
+    this.isDimensionExistsInCurrentSlice = isDimensionExistsInCurrentSlice;
   }
 
   public void populateFilterInfoBasedOnColumnType(ResolvedFilterInfoVisitorIntf visitor,
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/TrueConditionalResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/TrueConditionalResolverImpl.java
index 22add32..4833ef6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/TrueConditionalResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/TrueConditionalResolverImpl.java
@@ -37,8 +37,8 @@ public class TrueConditionalResolverImpl extends ConditionalFilterResolverImpl {
 
   /**
    * This method will provide the executor type to the callee inorder to identify
-   * the executer type for the filter resolution, Row level filter executer is a
-   * special executer since it get all the rows of the specified filter dimension
+   * the executor type for the filter resolution, Row level filter executor is a
+   * special executor since it get all the rows of the specified filter dimension
    * and will be send to the spark for processing
    */
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
index 6a188b4..a5d7562 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java
@@ -49,7 +49,7 @@ public class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex
    * @param nullBits Null row ordinals in the bitset
    * @param isnullBitsExists whether to consider inverted index while setting null bitset or not.
    *                          we are having nullBitset even for dimensions also.
-   *                          But some dimension columns still don't have nullbitset.
+   *                          But some dimension columns still don't have nullBitset.
    *                          So if null bitset does not exist then
    *                          it should not inverted index while setting the null
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 573e8b9..8946643 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -182,21 +182,7 @@ public class BlockletFilterScanner extends BlockletFullScanner {
     if (bitSetGroup.isEmpty()) {
       CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(),
           rawBlockletColumnChunks.getMeasureRawColumnChunks());
-
-      QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
-      scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
-          scanTime.getCount() + (System.currentTimeMillis() - startTime));
-
-      QueryStatistic scannedBlocklets = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM);
-      scannedBlocklets.addCountStatistic(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM,
-          scannedBlocklets.getCount() + 1);
-
-      QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.PAGE_SCANNED);
-      scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
-          scannedPages.getCount() + bitSetGroup.getScannedPages());
+      addQueryStatistic(startTime, bitSetGroup.getScannedPages());
       return createEmptyResult();
     }
 
@@ -367,21 +353,7 @@ public class BlockletFilterScanner extends BlockletFullScanner {
     if (pages.isEmpty()) {
       CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(),
           rawBlockletColumnChunks.getMeasureRawColumnChunks());
-
-      QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
-      scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
-          scanTime.getCount() + (System.currentTimeMillis() - startTime));
-
-      QueryStatistic scannedBlocklets = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM);
-      scannedBlocklets.addCountStatistic(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM,
-          scannedBlocklets.getCount() + 1);
-
-      QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
-          .get(QueryStatisticsConstants.PAGE_SCANNED);
-      scannedPages
-          .addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED, scannedPages.getCount());
+      addQueryStatistic(startTime, 0L);
       return createEmptyResult();
     }
 
@@ -461,4 +433,21 @@ public class BlockletFilterScanner extends BlockletFullScanner {
 
     return scannedResult;
   }
+
+  private void addQueryStatistic(long startTime, long numberOfPages) {
+    QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+    scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+        scanTime.getCount() + (System.currentTimeMillis() - startTime));
+
+    QueryStatistic scannedBlocklets = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM);
+    scannedBlocklets.addCountStatistic(QueryStatisticsConstants.BLOCKLET_SCANNED_NUM,
+        scannedBlocklets.getCount() + 1);
+
+    QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_SCANNED);
+    scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
+        scannedPages.getCount() + numberOfPages);
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
index 704df72..30da1e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
@@ -22,6 +22,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockIndex;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -93,9 +94,9 @@ public class SegmentMetaDataInfoStats {
       BlockColumnMetaDataInfo previousBlockColumnMetaInfo =
           this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId);
       // compare and get updated min and max values
-      byte[][] updatedMin = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(),
+      byte[][] updatedMin = BlockIndex.compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(),
           currentBlockColumnMetaInfo.getMin(), true);
-      byte[][] updatedMax = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(),
+      byte[][] updatedMax = BlockIndex.compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(),
           currentBlockColumnMetaInfo.getMax(), false);
       // update the segment
       this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)
@@ -144,24 +145,4 @@ public class SegmentMetaDataInfoStats {
     }
     return updatedMinMaxValues;
   }
-
-  private synchronized byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1,
-      byte[][] minMaxValueCompare2, boolean isMinValueComparison) {
-    // Compare and update min max values
-    byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][];
-    System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length);
-    for (int i = 0; i < minMaxValueCompare1.length; i++) {
-      int compare = ByteUtil.UnsafeComparer.INSTANCE
-          .compareTo(minMaxValueCompare2[i], minMaxValueCompare1[i]);
-      if (isMinValueComparison) {
-        if (compare < 0) {
-          updatedMinMaxValues[i] = minMaxValueCompare2[i];
-        }
-      } else if (compare > 0) {
-        updatedMinMaxValues[i] = minMaxValueCompare2[i];
-      }
-    }
-    return updatedMinMaxValues;
-  }
-
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 5c0422c..c2d74da 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -31,13 +30,9 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.DecimalType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -74,12 +69,8 @@ public abstract class AbstractDataFileFooterConverter {
       indexReader.openThriftReader(filePath);
       // get the index header
       org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns =
-          readIndexHeader.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-      }
+      List<ColumnSchema> columnSchemaList =
+          convertColumnSchemaList(readIndexHeader.getTable_columns());
       // get the segment info
       BlockletIndex blockletIndex = null;
       int counter = 0;
@@ -119,6 +110,16 @@ public abstract class AbstractDataFileFooterConverter {
     return dataFileFooters;
   }
 
+  protected List<ColumnSchema> convertColumnSchemaList(
+      List<org.apache.carbondata.format.ColumnSchema> tableColumns) {
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    List<ColumnSchema> columnSchemaList = new ArrayList<>();
+    for (org.apache.carbondata.format.ColumnSchema tableColumn : tableColumns) {
+      columnSchemaList.add(schemaConverter.fromExternalToWrapperColumnSchema(tableColumn));
+    }
+    return columnSchemaList;
+  }
+
   /**
    * Below method will be used to get the index info from index file
    *
@@ -148,12 +149,8 @@ public abstract class AbstractDataFileFooterConverter {
       }
       // get the index header
       org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns =
-          readIndexHeader.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-      }
+      List<ColumnSchema> columnSchemaList =
+          convertColumnSchemaList(readIndexHeader.getTable_columns());
       if (!isTransactionalTable) {
         QueryUtil.updateColumnUniqueIdForNonTransactionTable(columnSchemaList);
       }
@@ -280,88 +277,6 @@ public abstract class AbstractDataFileFooterConverter {
     return blockletIndex;
   }
 
-  protected ColumnSchema thriftColumnSchemaToWrapperColumnSchema(
-      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
-    ColumnSchema wrapperColumnSchema = new ColumnSchema();
-    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
-    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
-    DataType dataType = CarbonUtil.thriftDataTypeToWrapperDataType(externalColumnSchema.data_type);
-    if (DataTypes.isDecimal(dataType)) {
-      DecimalType decimalType = (DecimalType) dataType;
-      decimalType.setPrecision(externalColumnSchema.getPrecision());
-      decimalType.setScale(externalColumnSchema.getScale());
-    }
-    wrapperColumnSchema.setDataType(dataType);
-    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
-    List<Encoding> encoders = new ArrayList<Encoding>();
-    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
-      encoders.add(fromExternalToWrapperEncoding(encoder));
-    }
-    wrapperColumnSchema.setEncodingList(encoders);
-    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
-    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
-    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
-    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
-    Map<String, String> properties = externalColumnSchema.getColumnProperties();
-    if (properties != null) {
-      if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
-        wrapperColumnSchema.setSortColumn(true);
-      }
-    }
-    wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
-    wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
-    List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
-        externalColumnSchema.getParentColumnTableRelations();
-    if (null != parentColumnTableRelation) {
-      wrapperColumnSchema.setParentColumnTableRelations(
-          fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation));
-    }
-    return wrapperColumnSchema;
-  }
-
-  private List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations(
-      List<org.apache.carbondata.format.ParentColumnTableRelation> thriftParentColumnRelation) {
-    List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>();
-    for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation :
-        thriftParentColumnRelation) {
-      RelationIdentifier relationIdentifier =
-          new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(),
-              carbonTableRelation.getRelationIdentifier().getTableName(),
-              carbonTableRelation.getRelationIdentifier().getTableId());
-      ParentColumnTableRelation parentColumnTableRelation =
-          new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(),
-              carbonTableRelation.getColumnName());
-      parentColumnTableRelationList.add(parentColumnTableRelation);
-    }
-    return parentColumnTableRelationList;
-  }
-
-  /**
-   * Below method is convert the thrift encoding to wrapper encoding
-   *
-   * @param encoderThrift thrift encoding
-   * @return wrapper encoding
-   */
-  protected Encoding fromExternalToWrapperEncoding(
-      org.apache.carbondata.format.Encoding encoderThrift) {
-    switch (encoderThrift) {
-      case DICTIONARY:
-        return Encoding.DICTIONARY;
-      case DELTA:
-        return Encoding.DELTA;
-      case RLE:
-        return Encoding.RLE;
-      case INVERTED_INDEX:
-        return Encoding.INVERTED_INDEX;
-      case BIT_PACKED:
-        return Encoding.BIT_PACKED;
-      case DIRECT_DICTIONARY:
-        return Encoding.DIRECT_DICTIONARY;
-      default:
-        throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
-    }
-  }
-
   /**
    * Below method will be used to convert the blocklet index of thrift to
    * wrapper
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
index 7624cfc..8841935 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonLoadStatisticsImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.util;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -114,34 +115,26 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
     return lruCacheLoadTime;
   }
 
-  public void recordDictionaryValuesTotalTime(String partitionID,
-      Long dictionaryValuesTotalTimeTimePoint) {
-    if (null != parDictionaryValuesTotalTimeMap.get(partitionID)) {
-      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[0]) {
-        parDictionaryValuesTotalTimeMap.get(partitionID)[0] = dictionaryValuesTotalTimeTimePoint;
+  private void recordStatistics(Map<String, Long[]> statMap, String partitionID, Long value) {
+    if (null != statMap.get(partitionID)) {
+      if (null == statMap.get(partitionID)[0]) {
+        statMap.get(partitionID)[0] = value;
       }
-      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[1] ||
-          dictionaryValuesTotalTimeTimePoint - parDictionaryValuesTotalTimeMap.get(partitionID)[0] >
-              parDictionaryValuesTotalTimeMap.get(partitionID)[1]) {
-        parDictionaryValuesTotalTimeMap.get(partitionID)[1] = dictionaryValuesTotalTimeTimePoint -
-            parDictionaryValuesTotalTimeMap.get(partitionID)[0];
+      if (null == statMap.get(partitionID)[1] ||
+          value - statMap.get(partitionID)[0] > statMap.get(partitionID)[1]) {
+        statMap.get(partitionID)[1] = value - statMap.get(partitionID)[0];
       }
     }
   }
 
-  public void recordCsvInputStepTime(String partitionID,
-      Long csvInputStepTimePoint) {
-    if (null != parCsvInputStepTimeMap.get(partitionID)) {
-      if (null == parCsvInputStepTimeMap.get(partitionID)[0]) {
-        parCsvInputStepTimeMap.get(partitionID)[0] = csvInputStepTimePoint;
-      }
-      if (null == parCsvInputStepTimeMap.get(partitionID)[1] ||
-              csvInputStepTimePoint - parCsvInputStepTimeMap.get(partitionID)[0] >
-                      parCsvInputStepTimeMap.get(partitionID)[1]) {
-        parCsvInputStepTimeMap.get(partitionID)[1] = csvInputStepTimePoint -
-                parCsvInputStepTimeMap.get(partitionID)[0];
-      }
-    }
+  public void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint) {
+    recordStatistics(parDictionaryValuesTotalTimeMap, partitionID,
+        dictionaryValuesTotalTimeTimePoint);
+  }
+
+  public void recordCsvInputStepTime(String partitionID, Long csvInputStepTimePoint) {
+    recordStatistics(parCsvInputStepTimeMap, partitionID, csvInputStepTimePoint);
   }
 
   public void recordLruCacheLoadTime(double lruCacheLoadTime) {
@@ -150,68 +143,22 @@ public class CarbonLoadStatisticsImpl implements LoadStatistics {
 
   public void recordGeneratingDictionaryValuesTime(String partitionID,
       Long generatingDictionaryValuesTimePoint) {
-    if (null != parGeneratingDictionaryValuesTimeMap.get(partitionID)) {
-      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[0]) {
-        parGeneratingDictionaryValuesTimeMap.get(partitionID)[0] =
-                generatingDictionaryValuesTimePoint;
-      }
-      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] ||
-              generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
-                      .get(partitionID)[0] > parGeneratingDictionaryValuesTimeMap
-                      .get(partitionID)[1]) {
-        parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] =
-                generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
-                        .get(partitionID)[0];
-      }
-    }
+    recordStatistics(parGeneratingDictionaryValuesTimeMap, partitionID,
+        generatingDictionaryValuesTimePoint);
   }
 
-  public void recordSortRowsStepTotalTime(String partitionID,
-                                          Long sortRowsStepTotalTimePoint) {
-    if (null != parSortRowsStepTotalTimeMap.get(partitionID)) {
-      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[0]) {
-        parSortRowsStepTotalTimeMap.get(partitionID)[0] = sortRowsStepTotalTimePoint;
-      }
-      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[1] ||
-              sortRowsStepTotalTimePoint - parSortRowsStepTotalTimeMap.get(partitionID)[0] >
-                      parSortRowsStepTotalTimeMap.get(partitionID)[1]) {
-        parSortRowsStepTotalTimeMap.get(partitionID)[1] = sortRowsStepTotalTimePoint -
-                parSortRowsStepTotalTimeMap.get(partitionID)[0];
-      }
-    }
+  public void recordSortRowsStepTotalTime(String partitionID, Long sortRowsStepTotalTimePoint) {
+    recordStatistics(parSortRowsStepTotalTimeMap, partitionID, sortRowsStepTotalTimePoint);
   }
 
-  public void recordMdkGenerateTotalTime(String partitionID,
-                                         Long mdkGenerateTotalTimePoint) {
-    if (null != parMdkGenerateTotalTimeMap.get(partitionID)) {
-      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[0]) {
-        parMdkGenerateTotalTimeMap.get(partitionID)[0] = mdkGenerateTotalTimePoint;
-      }
-      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[1] ||
-              mdkGenerateTotalTimePoint - parMdkGenerateTotalTimeMap.get(partitionID)[0] >
-                      parMdkGenerateTotalTimeMap.get(partitionID)[1]) {
-        parMdkGenerateTotalTimeMap.get(partitionID)[1] = mdkGenerateTotalTimePoint -
-                parMdkGenerateTotalTimeMap.get(partitionID)[0];
-      }
-    }
+  public void recordMdkGenerateTotalTime(String partitionID, Long mdkGenerateTotalTimePoint) {
+    recordStatistics(parMdkGenerateTotalTimeMap, partitionID, mdkGenerateTotalTimePoint);
   }
 
   public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
       Long dictionaryValue2MdkAdd2FileTimePoint) {
-    if (null != parDictionaryValue2MdkAdd2FileTime.get(partitionID)) {
-      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0]) {
-        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0] =
-                dictionaryValue2MdkAdd2FileTimePoint;
-      }
-      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] ||
-              dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
-                      .get(partitionID)[0] > parDictionaryValue2MdkAdd2FileTime
-                      .get(partitionID)[1]) {
-        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] =
-                dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
-                        .get(partitionID)[0];
-      }
-    }
+    recordStatistics(parDictionaryValue2MdkAdd2FileTime, partitionID,
+        dictionaryValue2MdkAdd2FileTimePoint);
   }
 
   //Record the node blocks information map
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 5b1b2cf..fc2ed77 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -619,20 +619,21 @@ public final class CarbonProperties {
   }
 
   private void validateEnableAutoHandoff() {
-    String offHeapSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT);
-    if (offHeapSortStr == null) {
-      carbonProperties.setProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
-          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT);
-      offHeapSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT);
-    }
-    boolean isValidBooleanValue = CarbonUtil.validateBoolean(offHeapSortStr);
+    String autoHandoffString =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF);
+    if (autoHandoffString == null) {
+      carbonProperties.setProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
+          CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT);
+      autoHandoffString = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF);
+    }
+    boolean isValidBooleanValue = CarbonUtil.validateBoolean(autoHandoffString);
     if (!isValidBooleanValue) {
-      LOGGER.warn(String.format("The enable off heap sort value \"%s\" is invalid. " +
+      LOGGER.warn(String.format("The enable auto handoff value \"%s\" is invalid. " +
               "Using the default value \"%s\"",
-          offHeapSortStr,
-          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
-      carbonProperties.setProperty(ENABLE_OFFHEAP_SORT,
-          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT);
+          autoHandoffString,
+          CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT));
+      carbonProperties.setProperty(ENABLE_AUTO_HANDOFF,
+          CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index c739553..58f6feb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -66,12 +66,7 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
       FileFooter footer = reader.readFooter();
       dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
       dataFileFooter.setNumberOfRows(footer.getNum_rows());
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-      }
-      dataFileFooter.setColumnInTable(columnSchemaList);
+      dataFileFooter.setColumnInTable(convertColumnSchemaList(footer.getTable_columns()));
 
       List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
           footer.getBlocklet_index_list();
@@ -116,7 +111,6 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
   @Override
   public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
     FileReader fileReader = null;
-    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
     try {
       long completeBlockLength = tableBlockInfo.getBlockLength();
       long footerPointer = completeBlockLength - 8;
@@ -124,16 +118,11 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
       long actualFooterOffset = fileReader.readLong(tableBlockInfo.getFilePath(), footerPointer);
       CarbonFooterReader reader =
           new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset);
-      FileFooter footer = reader.readFooter();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-      }
+      return convertColumnSchemaList(reader.readFooter().getTable_columns());
     } finally {
       if (null != fileReader) {
         fileReader.finish();
       }
     }
-    return columnSchemaList;
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index f7ad18c..aea722b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -79,11 +79,7 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     } else {
       dataFileFooter.setSorted(null);
     }
-    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-    List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
-    for (int i = 0; i < table_columns.size(); i++) {
-      columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-    }
+    List<ColumnSchema> columnSchemaList = convertColumnSchemaList(fileHeader.getColumn_schema());
     dataFileFooter.setColumnInTable(columnSchemaList);
     List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
         footer.getBlocklet_index_list();
@@ -109,13 +105,7 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
   @Override
   public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
     CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
-    FileHeader fileHeader = carbonHeaderReader.readHeader();
-    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-    List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
-    for (int i = 0; i < table_columns.size(); i++) {
-      columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
-    }
-    return columnSchemaList;
+    return convertColumnSchemaList(carbonHeaderReader.readHeader().getColumn_schema());
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 3afc5da..357bb52 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -157,20 +157,7 @@ public final class DataTypeUtil {
     } else if (dataType == DataTypes.BYTE) {
       return Byte.parseByte(dimValue);
     } else if (dataType == DataTypes.TIMESTAMP) {
-      Date dateToStr = null;
-      DateFormat dateFormatter = null;
-      try {
-        if (null != timeStampFormat && !timeStampFormat.trim().isEmpty()) {
-          dateFormatter = new SimpleDateFormat(timeStampFormat);
-          dateFormatter.setLenient(false);
-        } else {
-          dateFormatter = timestampFormatter.get();
-        }
-        dateToStr = dateFormatter.parse(dimValue);
-        return dateToStr.getTime();
-      } catch (ParseException e) {
-        throw new NumberFormatException(e.getMessage());
-      }
+      return parseTimestamp(dimValue, timeStampFormat);
     } else {
       Double parsedValue = Double.valueOf(dimValue);
       if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
@@ -439,26 +426,30 @@ public final class DataTypeUtil {
     } else if (DataTypes.isDecimal(actualDataType)) {
       return new BigDecimal(dimensionValue);
     } else if (actualDataType == DataTypes.TIMESTAMP) {
-      Date dateToStr = null;
-      DateFormat dateFormatter = null;
-      try {
-        if (null != dateFormat && !dateFormat.trim().isEmpty()) {
-          dateFormatter = new SimpleDateFormat(dateFormat);
-          dateFormatter.setLenient(false);
-        } else {
-          dateFormatter = timestampFormatter.get();
-        }
-        dateToStr = dateFormatter.parse(dimensionValue);
-        return dateToStr.getTime();
-      } catch (ParseException e) {
-        throw new NumberFormatException(e.getMessage());
-      }
+      return parseTimestamp(dimensionValue, dateFormat);
     } else {
       // Default action for String/Varchar
       return converter.convertFromStringToUTF8String(dimensionValue);
     }
   }
 
+  private static Object parseTimestamp(String dimensionValue, String dateFormat) {
+    Date dateToStr;
+    DateFormat dateFormatter;
+    try {
+      if (null != dateFormat && !dateFormat.trim().isEmpty()) {
+        dateFormatter = new SimpleDateFormat(dateFormat);
+        dateFormatter.setLenient(false);
+      } else {
+        dateFormatter = timestampFormatter.get();
+      }
+      dateToStr = dateFormatter.parse(dimensionValue);
+      return dateToStr.getTime();
+    } catch (ParseException e) {
+      throw new NumberFormatException(e.getMessage());
+    }
+  }
+
   public static byte[] getBytesDataDataTypeForNoDictionaryColumn(Object dimensionValue,
       DataType actualDataType) {
     if (dimensionValue == null) {
@@ -768,39 +759,6 @@ public final class DataTypeUtil {
   }
 
   /**
-   * This method will parse a given string value corresponding to its data type
-   *
-   * @param value     value to parse
-   * @param dimension dimension to get data type and precision and scale in case of decimal
-   *                  data type
-   * @return
-   */
-  public static String normalizeColumnValueForItsDataType(String value, CarbonColumn dimension) {
-    try {
-      Object parsedValue = null;
-      // validation will not be done for timestamp datatype as for timestamp direct dictionary
-      // is generated. No dictionary file is created for timestamp datatype column
-      DataType dataType = dimension.getDataType();
-      if (DataTypes.isDecimal(dataType)) {
-        return parseStringToBigDecimal(value, dimension);
-      } else if (dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
-          dataType == DataTypes.LONG) {
-        parsedValue = normalizeIntAndLongValues(value, dimension.getDataType());
-      } else if (dataType == DataTypes.DOUBLE) {
-        parsedValue = Double.parseDouble(value);
-      } else {
-        return value;
-      }
-      if (null != parsedValue) {
-        return value;
-      }
-      return null;
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  /**
    * This method will parse a value to its datatype if datatype is decimal else will return
    * the value passed
    *
@@ -934,37 +892,6 @@ public final class DataTypeUtil {
     }
   }
 
-  /**
-   * This method will parse a given string value corresponding to its data type
-   *
-   * @param value        value to parse
-   * @param columnSchema dimension to get data type and precision and scale in case of decimal
-   *                     data type
-   * @return
-   */
-  public static String normalizeColumnValueForItsDataType(String value, ColumnSchema columnSchema) {
-    try {
-      Object parsedValue = null;
-      DataType dataType = columnSchema.getDataType();
-      if (DataTypes.isDecimal(dataType)) {
-        return parseStringToBigDecimal(value, columnSchema);
-      } else if (dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
-          dataType == DataTypes.LONG) {
-        parsedValue = normalizeIntAndLongValues(value, columnSchema.getDataType());
-      } else if (dataType == DataTypes.DOUBLE) {
-        parsedValue = Double.parseDouble(value);
-      } else {
-        return value;
-      }
-      if (null != parsedValue) {
-        return value;
-      }
-      return null;
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
   private static String parseStringToBigDecimal(String value, ColumnSchema columnSchema) {
     BigDecimal bigDecimal =
         new BigDecimal(value).setScale(columnSchema.getScale(), RoundingMode.HALF_UP);
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 40c346d..35ef958 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -136,17 +136,8 @@ public class CarbonIndexFileMergeWriter {
     if (null != partitionPath && !partitionTempPath.isEmpty()) {
       fileStore.readAllIIndexOfSegment(partitionTempPath);
     }
-    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
-    Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
-    for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
-      Path path = new Path(entry.getKey());
-      Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
-      if (map == null) {
-        map = new HashMap<>();
-        indexLocationMap.put(path.getParent().toString(), map);
-      }
-      map.put(path.getName(), entry.getValue());
-    }
+    Map<String, Map<String, byte[]>> indexLocationMap =
+        groupIndexesBySegment(fileStore.getCarbonIndexMapWithFullPath());
     SegmentFileStore.FolderDetails folderDetails = null;
     for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
       String mergeIndexFile = writeMergeIndexFile(null, partitionPath, entry.getValue(), segmentId);
@@ -182,6 +173,17 @@ public class CarbonIndexFileMergeWriter {
     return folderDetails;
   }
 
+  private Map<String, Map<String, byte[]>> groupIndexesBySegment(Map<String, byte[]> indexMap) {
+    Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
+    for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
+      Path path = new Path(entry.getKey());
+      indexLocationMap
+          .computeIfAbsent(path.getParent().toString(), k -> new HashMap<>())
+          .put(path.getName(), entry.getValue());
+    }
+    return indexLocationMap;
+  }
+
   private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
       boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
       String segmentId) throws IOException {
@@ -215,17 +217,8 @@ public class CarbonIndexFileMergeWriter {
       fileStore.readAllIIndexOfSegment(segmentFileStore.getSegmentFile(),
           segmentFileStore.getTablePath(), SegmentStatus.SUCCESS, true);
     }
-    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
-    Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
-    for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
-      Path path = new Path(entry.getKey());
-      Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
-      if (map == null) {
-        map = new HashMap<>();
-        indexLocationMap.put(path.getParent().toString(), map);
-      }
-      map.put(path.getName(), entry.getValue());
-    }
+    Map<String, Map<String, byte[]>> indexLocationMap =
+        groupIndexesBySegment(fileStore.getCarbonIndexMapWithFullPath());
     List<PartitionSpec> partitionSpecs = SegmentFileStore
         .getPartitionSpecs(segmentId, table.getTablePath(), SegmentStatusManager
             .readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath())));
diff --git a/core/src/test/java/org/apache/carbondata/core/keygenerator/mdkey/BitsUnitTest.java b/core/src/test/java/org/apache/carbondata/core/keygenerator/mdkey/BitsUnitTest.java
index 0ba2f39..227c314 100644
--- a/core/src/test/java/org/apache/carbondata/core/keygenerator/mdkey/BitsUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/keygenerator/mdkey/BitsUnitTest.java
@@ -40,13 +40,6 @@ public class BitsUnitTest {
     assertThat(result, is(equalTo(expected)));
   }
 
-  @Test public void testGetWithLongKeys() throws Exception {
-    long[] expected = new long[] { 0L, 0L, 103616086028L};
-    long[] keys = new long[] { 24L, 32L, 12L, 64L, 40L };
-    long[] result = bits.get(keys);
-    assertThat(result, is(equalTo(expected)));
-  }
-
   @Test public void testGetKeyByteOffsets() throws Exception {
     int[] lens = new int[] { 64, 64, 64, 64, 64 };
     Bits bits1 = new Bits(lens);
@@ -56,48 +49,6 @@ public class BitsUnitTest {
     assertThat(result, is(equalTo(expected)));
   }
 
-  @Test public void testGetKeyArray() throws Exception {
-    int[] lens = new int[] { 8, 32, 24 };
-    Bits bit1 = new Bits(lens);
-    int[] maskByteRanges = new int[] { 1, 3, 5, 6, 4, 8, 9, 2 };
-    byte[] key = new byte[] { 8, 24, 32, 24, 40, 127, 64, 16, 24, 16 };
-    long[] expected = new long[] { 24L, 410992680L, 1576992L };
-    long[] result = bit1.getKeyArray(key, maskByteRanges);
-    assertThat(result, is(equalTo(expected)));
-  }
-
-  @Test public void testGetKeyArrayWithKeyContainsNegativeValueOFByte() throws Exception {
-    int[] lens = new int[] { 8, 32, 24 };
-    Bits bit1 = new Bits(lens);
-    int[] maskByteRanges = new int[] { 1, 3, 5, 6, 4, 8, 9, 2 };
-    byte[] key = new byte[] { -8, 24, 32, -24, 40, -127, 64, 16, -24, 16 };
-    long[] expected = new long[] { 24L, 3900784680L, 15208480L };
-    long[] result = bit1.getKeyArray(key, maskByteRanges);
-    assertThat(result, is(equalTo(expected)));
-  }
-
-  @Test public void testGetKeyArrayWithByteBoundaryValue() throws Exception {
-    int[] lens = new int[] { 127, 127, 127 };
-    Bits bits1= new Bits(lens);
-    int[] maskByteRanges =
-        new int[] { 1, 3, 5, 6, 4, 8, 9, 2, 1, 3, 5, 6, 4, 8, 9, 2, 1, 3, 5, 6, 4, 8, 9, 2, 1, 3, 5,
-            6, 4, 8, 9, 2, 1, 3, 5, 6, 4, 8, 9, 2, 1, 3, 5, 6, 4, 8, 9, 2 };
-    byte[] key = new byte[] { 127, 24, 32, 127, 40, 127, 64, 16, 24, 16 };
-    long[] expected =
-        new long[] { 7061077969919295616L, 3530538984959647808L, 1765269492479823904L };
-    long[] result = bits1.getKeyArray(key, maskByteRanges);
-    assertThat(result, is(equalTo(expected)));
-  }
-
-  @Test public void testGetKeyArrayWithNullValue() throws Exception {
-    int[] lens = new int[] { 20, 35, 10 };
-    Bits bit1 = new Bits(lens);
-    byte[] key = new byte[] { 10, 20, 30, 10, 15, 10, 20, 30, 10, 15 };
-    long[] expected = new long[] { 41200, 10800497927L, 522 };
-    long[] result = bit1.getKeyArray(key, null);
-    assertThat(result, is(equalTo(expected)));
-  }
-
   @Test public void testEqualsWithBitsObject() throws Exception {
     boolean result = bits.equals(bits);
     assertEquals(true, result);