You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/29 08:48:04 UTC

carbondata git commit: [CARBONDATA-2669] Local Dictionary Store Size optimisation and other function issues

Repository: carbondata
Updated Branches:
  refs/heads/master dac8a4b00 -> 334e64778


[CARBONDATA-2669] Local Dictionary Store Size optimisation and other function issues

Problems
Local dictionary store size issue.
When all column data is empty and columns are not present in sort columns local dictionary size was more than no dictionary dictionary store size.
Page level dictionary merging Issue
While merging the page used dictionary values in a blocklet it was missing some of the dictionary values, this is because, AND operation was done on bitset
Local Dictionary null values
Null value was not added in LV because of this new dictionary values was getting generated for null values
Local dictionary generator thread specific
Solution:
Added rle for unsorted dictionary values to reduce the size.
Now OR operation is performed while merging the dictionary values
Added LV for null values
Local dictionary generator task specific

This closes #2427


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/334e6477
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/334e6477
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/334e6477

Branch: refs/heads/master
Commit: 334e6477892f84016c2f4078e8eb562a1e6ec68d
Parents: dac8a4b
Author: kumarvishal09 <ku...@gmail.com>
Authored: Thu Jun 28 22:49:37 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Jun 29 14:17:52 2018 +0530

----------------------------------------------------------------------
 .../columnar/BlockIndexerStorageForInt.java     | 246 -------------------
 ...kIndexerStorageForNoInvertedIndexForInt.java | 118 ---------
 ...ndexerStorageForNoInvertedIndexForShort.java | 128 +++++-----
 .../columnar/BlockIndexerStorageForShort.java   |  17 --
 .../core/datastore/columnar/IndexStorage.java   |  14 --
 .../legacy/DictDimensionIndexCodec.java         |   2 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   2 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   5 +-
 .../localdictionary/PageLevelDictionary.java    |   2 +-
 .../ColumnLocalDictionaryGenerator.java         |  10 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  49 ++++
 .../CarbonRowDataWriterProcessorStepImpl.java   |   8 +
 .../steps/DataWriterBatchProcessorStepImpl.java |   7 +
 .../steps/DataWriterProcessorStepImpl.java      |  16 +-
 .../store/CarbonFactDataHandlerModel.java       |  47 +---
 15 files changed, 159 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForInt.java
deleted file mode 100644
index 27194bb..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForInt.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class BlockIndexerStorageForInt implements IndexStorage<int[]> {
-  private boolean alreadySorted;
-
-  private int[] rowIdPage;
-
-  private int[] rowIdRlePage;
-
-  private byte[][] dataPage;
-
-  private int[] dataRlePage;
-
-  private int totalSize;
-
-  public BlockIndexerStorageForInt(byte[][] dataPage, boolean rleOnData, boolean isNoDictionary,
-      boolean isSortRequired) {
-    ColumnWithRowId<Integer>[] dataWithRowId = createColumnWithRowId(dataPage, isNoDictionary);
-    if (isSortRequired) {
-      Arrays.sort(dataWithRowId);
-    }
-    int[] rowIds = extractDataAndReturnRowIds(dataWithRowId, dataPage);
-    rleEncodeOnRowId(rowIds);
-    if (rleOnData) {
-      rleEncodeOnData(dataWithRowId);
-    }
-  }
-
-  /**
-   * Create an object with each column array and respective rowId
-   */
-  private ColumnWithRowId<Integer>[] createColumnWithRowId(byte[][] dataPage,
-      boolean isNoDictionary) {
-    ColumnWithRowId<Integer>[] columnWithRowId = new ColumnWithRowId[dataPage.length];
-    if (isNoDictionary) {
-      for (int i = 0; i < columnWithRowId.length; i++) {
-        columnWithRowId[i] = new ColumnWithRowIdForHighCard<>(dataPage[i], i);
-      }
-    } else {
-      for (int i = 0; i < columnWithRowId.length; i++) {
-        columnWithRowId[i] = new ColumnWithRowId<>(dataPage[i], i);
-      }
-    }
-    return columnWithRowId;
-  }
-
-  private int[] extractDataAndReturnRowIds(ColumnWithRowId<Integer>[] dataWithRowId,
-      byte[][] keyBlock) {
-    int[] rowId = new int[dataWithRowId.length];
-    for (int i = 0; i < rowId.length; i++) {
-      rowId[i] = dataWithRowId[i].getIndex();
-      keyBlock[i] = dataWithRowId[i].getColumn();
-    }
-    this.dataPage = keyBlock;
-    return rowId;
-  }
-
-  /**
-   * It compresses depends up on the sequence numbers.
-   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
-   * first array the start and end of sequential numbers and second array
-   * keeps the indexes of where sequential numbers starts. If there is no
-   * sequential numbers then the same array it returns with empty second
-   * array.
-   *
-   * @param rowIds
-   */
-  public void rleEncodeOnRowId(int[] rowIds) {
-    List<Integer> list = new ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    List<Integer> map = new ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    int k = 0;
-    int i = 1;
-    for (; i < rowIds.length; i++) {
-      if (rowIds[i] - rowIds[i - 1] == 1) {
-        k++;
-      } else {
-        if (k > 0) {
-          map.add((list.size()));
-          list.add(rowIds[i - k - 1]);
-          list.add(rowIds[i - 1]);
-        } else {
-          list.add(rowIds[i - 1]);
-        }
-        k = 0;
-      }
-    }
-    if (k > 0) {
-      map.add((list.size()));
-      list.add(rowIds[i - k - 1]);
-      list.add(rowIds[i - 1]);
-    } else {
-      list.add(rowIds[i - 1]);
-    }
-    rowIdPage = convertToArray(list);
-    if (rowIds.length == rowIdPage.length) {
-      rowIdRlePage = new int[0];
-    } else {
-      rowIdRlePage = convertToArray(map);
-    }
-    if (rowIdPage.length == 2 && rowIdRlePage.length == 1) {
-      alreadySorted = true;
-    }
-  }
-
-  private int[] convertToArray(List<Integer> list) {
-    int[] shortArray = new int[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  /**
-   * @return the alreadySorted
-   */
-  public boolean isAlreadySorted() {
-    return alreadySorted;
-  }
-
-  /**
-   * @return the rowIdPage
-   */
-  public int[] getRowIdPage() {
-    return rowIdPage;
-  }
-
-  @Override
-  public int getRowIdPageLengthInBytes() {
-    if (rowIdPage != null) {
-      return rowIdPage.length * 4;
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * @return the rowIdRlePage
-   */
-  public int[] getRowIdRlePage() {
-    return rowIdRlePage;
-  }
-
-  @Override
-  public int getRowIdRlePageLengthInBytes() {
-    if (rowIdRlePage != null) {
-      return rowIdRlePage.length * 4;
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * @return the dataPage
-   */
-  public byte[][] getDataPage() {
-    return dataPage;
-  }
-
-  private void rleEncodeOnData(ColumnWithRowId[] dataWithRowId) {
-    byte[] prvKey = dataWithRowId[0].getColumn();
-    List<ColumnWithRowId> list =
-        new ArrayList<ColumnWithRowId>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    list.add(dataWithRowId[0]);
-    int counter = 1;
-    int start = 0;
-    List<Integer> map = new ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    for (int i = 1; i < dataWithRowId.length; i++) {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, dataWithRowId[i].getColumn()) != 0) {
-        prvKey = dataWithRowId[i].getColumn();
-        list.add(dataWithRowId[i]);
-        map.add(start);
-        map.add(counter);
-        start += counter;
-        counter = 1;
-        continue;
-      }
-      counter++;
-    }
-    map.add(start);
-    map.add(counter);
-    this.dataPage = convertToDataPage(list);
-    if (dataWithRowId.length == dataPage.length) {
-      dataRlePage = new int[0];
-    } else {
-      dataRlePage = convertToArray(map);
-    }
-  }
-
-  private byte[][] convertToDataPage(List<ColumnWithRowId> list) {
-    byte[][] shortArray = new byte[list.size()][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i).getColumn();
-      totalSize += shortArray[i].length;
-    }
-    return shortArray;
-  }
-
-  public int[] getDataRlePage() {
-    return dataRlePage;
-  }
-
-  @Override
-  public int getDataRlePageLengthInBytes() {
-    if (dataRlePage != null) {
-      return dataRlePage.length * 4;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override public int getTotalSize() {
-    return totalSize;
-  }
-
-  @Override public byte[] getMin() {
-    return dataPage[0];
-  }
-
-  @Override public byte[] getMax() {
-    return dataPage[dataPage.length - 1];
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForInt.java
deleted file mode 100644
index 218694f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForInt.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.columnar;
-
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * Below class will be used to for no inverted index
- */
-public class BlockIndexerStorageForNoInvertedIndexForInt implements IndexStorage<int[]> {
-
-  /**
-   * column data
-   */
-  private byte[][] dataPage;
-
-  /**
-   * total number of rows
-   */
-  private int totalSize;
-
-  private byte[] min;
-  private byte[] max;
-
-  public BlockIndexerStorageForNoInvertedIndexForInt(byte[][] dataPage) {
-    this.dataPage = dataPage;
-    min = this.dataPage[0];
-    max = this.dataPage[0];
-    totalSize += this.dataPage[0].length;
-    int minCompare = 0;
-    int maxCompare = 0;
-    for (int i = 1; i < this.dataPage.length; i++) {
-      totalSize += this.dataPage[i].length;
-      minCompare = ByteUtil.compare(min, this.dataPage[i]);
-      maxCompare = ByteUtil.compare(max, this.dataPage[i]);
-      if (minCompare > 0) {
-        min = this.dataPage[i];
-      }
-      if (maxCompare < 0) {
-        max = this.dataPage[i];
-      }
-    }
-  }
-
-  public int[] getDataRlePage() {
-    return new int[0];
-  }
-
-  @Override
-  public int getDataRlePageLengthInBytes() {
-    return 0;
-  }
-
-  @Override public int getTotalSize() {
-    return totalSize;
-  }
-
-  @Override public boolean isAlreadySorted() {
-    return true;
-  }
-
-  /**
-   * no use
-   *
-   * @return
-   */
-  public int[] getRowIdPage() {
-    return new int[0];
-  }
-
-  @Override
-  public int getRowIdPageLengthInBytes() {
-    return 0;
-  }
-
-  /**
-   * no use
-   *
-   * @return
-   */
-  public int[] getRowIdRlePage() {
-    return new int[0];
-  }
-
-  @Override
-  public int getRowIdRlePageLengthInBytes() {
-    return 0;
-  }
-
-  /**
-   * @return the dataPage
-   */
-  public byte[][] getDataPage() {
-    return dataPage;
-  }
-
-  @Override public byte[] getMin() {
-    return min;
-  }
-
-  @Override public byte[] getMax() {
-    return max;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 99a7e57..bbb3434 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -16,6 +16,10 @@
  */
 package org.apache.carbondata.core.datastore.columnar;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -28,72 +32,78 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
    */
   private byte[][] dataPage;
 
-  /**
-   * total number of rows
-   */
-  private int totalSize;
-
-  private byte[] min;
-  private byte[] max;
+  private short[] dataRlePage;
 
-  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage,
-      boolean isNoDictonary, boolean isVarchar) {
+  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage, boolean applyRLE) {
     this.dataPage = dataPage;
-    min = this.dataPage[0];
-    max = this.dataPage[0];
-    totalSize += this.dataPage[0].length;
-    int lVFormatLength = 2;
-    if (isVarchar) {
-      lVFormatLength = 4;
+    if (applyRLE) {
+      List<byte[]> actualDataList = new ArrayList<>();
+      for (int i = 0; i < dataPage.length; i++) {
+        actualDataList.add(dataPage[i]);
+      }
+      rleEncodeOnData(actualDataList);
     }
-    int minCompare = 0;
-    int maxCompare = 0;
-    if (!isNoDictonary) {
-      for (int i = 1; i < this.dataPage.length; i++) {
-        totalSize += this.dataPage[i].length;
-        minCompare = ByteUtil.compare(min, this.dataPage[i]);
-        maxCompare = ByteUtil.compare(max, this.dataPage[i]);
-        if (minCompare > 0) {
-          min = this.dataPage[i];
-        }
-        if (maxCompare < 0) {
-          max = this.dataPage[i];
-        }
+  }
+
+  private void rleEncodeOnData(List<byte[]> actualDataList) {
+    byte[] prvKey = actualDataList.get(0);
+    List<byte[]> list = new ArrayList<>(actualDataList.size() / 2);
+    list.add(actualDataList.get(0));
+    short counter = 1;
+    short start = 0;
+    List<Short> map = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    for (int i = 1; i < actualDataList.size(); i++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, actualDataList.get(i)) != 0) {
+        prvKey = actualDataList.get(i);
+        list.add(actualDataList.get(i));
+        map.add(start);
+        map.add(counter);
+        start += counter;
+        counter = 1;
+        continue;
       }
+      counter++;
+    }
+    map.add(start);
+    map.add(counter);
+    // if rle is index size is more than 70% then rle wont give any benefit
+    // so better to avoid rle index and write data as it is
+    boolean useRle = (((list.size() + map.size()) * 100) / actualDataList.size()) < 70;
+    if (useRle) {
+      this.dataPage = convertToDataPage(list);
+      dataRlePage = convertToArray(map);
     } else {
-      for (int i = 1; i < this.dataPage.length; i++) {
-        totalSize += this.dataPage[i].length;
-        minCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(min, lVFormatLength, min.length - lVFormatLength, this.dataPage[i],
-                lVFormatLength, this.dataPage[i].length - lVFormatLength);
-        maxCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(max, lVFormatLength, max.length - lVFormatLength, this.dataPage[i],
-                lVFormatLength, this.dataPage[i].length - lVFormatLength);
-        if (minCompare > 0) {
-          min = this.dataPage[i];
-        }
-        if (maxCompare < 0) {
-          max = this.dataPage[i];
-        }
-      }
+      this.dataPage = convertToDataPage(actualDataList);
+      dataRlePage = new short[0];
     }
   }
 
-  public short[] getDataRlePage() {
-    return new short[0];
+  private short[] convertToArray(List<Short> list) {
+    short[] shortArray = new short[list.size()];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
   }
 
-  @Override
-  public int getDataRlePageLengthInBytes() {
-    return 0;
+  private byte[][] convertToDataPage(List<byte[]> list) {
+    byte[][] shortArray = new byte[list.size()][];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
   }
 
-  @Override public int getTotalSize() {
-    return totalSize;
+  public short[] getDataRlePage() {
+    return dataRlePage;
   }
 
-  @Override public boolean isAlreadySorted() {
-    return true;
+  @Override public int getDataRlePageLengthInBytes() {
+    if (dataRlePage != null) {
+      return dataRlePage.length * 2;
+    } else {
+      return 0;
+    }
   }
 
   /**
@@ -105,8 +115,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override
-  public int getRowIdPageLengthInBytes() {
+  @Override public int getRowIdPageLengthInBytes() {
     return 0;
   }
 
@@ -119,8 +128,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
     return new short[0];
   }
 
-  @Override
-  public int getRowIdRlePageLengthInBytes() {
+  @Override public int getRowIdRlePageLengthInBytes() {
     return 0;
   }
 
@@ -130,12 +138,4 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   public byte[][] getDataPage() {
     return dataPage;
   }
-
-  @Override public byte[] getMin() {
-    return min;
-  }
-
-  @Override public byte[] getMax() {
-    return max;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index a91d6bc..be6a1a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -35,8 +35,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
 
   private short[] dataRlePage;
 
-  private int totalSize;
-
   public BlockIndexerStorageForShort(byte[][] dataPage, boolean rleOnData,
       boolean isNoDictionary, boolean isSortRequired) {
     ColumnWithRowId<Short>[] dataWithRowId = createColumnWithRowId(dataPage, isNoDictionary);
@@ -224,7 +222,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     byte[][] shortArray = new byte[indexes.length][];
     for (int i = 0; i < shortArray.length; i++) {
       shortArray[i] = indexes[i].getColumn();
-      totalSize += shortArray[i].length;
     }
     return shortArray;
   }
@@ -233,7 +230,6 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
     byte[][] shortArray = new byte[list.size()][];
     for (int i = 0; i < shortArray.length; i++) {
       shortArray[i] = list.get(i).getColumn();
-      totalSize += shortArray[i].length;
     }
     return shortArray;
   }
@@ -250,17 +246,4 @@ public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
       return 0;
     }
   }
-
-  @Override public int getTotalSize() {
-    return totalSize;
-  }
-
-  @Override public byte[] getMin() {
-    return dataPage[0];
-  }
-
-  @Override public byte[] getMax() {
-    return dataPage[dataPage.length - 1];
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
index 0ef8cad..a30ea88 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/IndexStorage.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.core.datastore.columnar;
 
 public interface IndexStorage<T> {
 
-  boolean isAlreadySorted();
-
   T getRowIdPage();
 
   int getRowIdPageLengthInBytes();
@@ -34,16 +32,4 @@ public interface IndexStorage<T> {
   T getDataRlePage();
 
   int getDataRlePageLengthInBytes();
-
-  int getTotalSize();
-
-  /**
-   * @return min value of block
-   */
-  byte[] getMin();
-
-  /**
-   * @return max value of block
-   */
-  byte[] getMax();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index 5694817..d157654 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -51,7 +51,7 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 17a523c..1e5015b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -52,7 +52,7 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index c68f394..f9c124f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -60,7 +60,7 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
           indexStorage = new BlockIndexerStorageForShort(data, isDictionary, !isDictionary, isSort);
         } else {
           indexStorage =
-              new BlockIndexerStorageForNoInvertedIndexForShort(data, !isDictionary, false);
+              new BlockIndexerStorageForNoInvertedIndexForShort(data, isDictionary);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);
@@ -75,6 +75,9 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
         } else if (indexStorage.getRowIdPageLengthInBytes() > 0) {
           encodings.add(Encoding.INVERTED_INDEX);
         }
+        if (indexStorage.getDataRlePageLengthInBytes() > 0) {
+          encodings.add(Encoding.RLE);
+        }
         return encodings;
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
index 3ea36ef..db95b03 100644
--- a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
@@ -81,7 +81,7 @@ public class PageLevelDictionary {
    * @param pageLevelDictionary other page level dictionary
    */
   public void mergerDictionaryValues(PageLevelDictionary pageLevelDictionary) {
-    usedDictionaryValues.and(pageLevelDictionary.usedDictionaryValues);
+    usedDictionaryValues.or(pageLevelDictionary.usedDictionaryValues);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
index 5ae9e27..4c2665f 100644
--- a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.localdictionary.generator;
 
+import java.nio.ByteBuffer;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.localdictionary.dictionaryholder.DictionaryStore;
 import org.apache.carbondata.core.localdictionary.dictionaryholder.MapBasedDictionaryStore;
@@ -31,13 +33,17 @@ public class ColumnLocalDictionaryGenerator implements LocalDictionaryGenerator
    */
   private DictionaryStore dictionaryHolder;
 
-  public ColumnLocalDictionaryGenerator(int threshold) {
+  public ColumnLocalDictionaryGenerator(int threshold, int lvLength) {
     // adding 1 to threshold for null value
     int newThreshold = threshold + 1;
     this.dictionaryHolder = new MapBasedDictionaryStore(newThreshold);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(
+        lvLength + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+    byteBuffer.putShort((short)CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+    byteBuffer.put(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     // for handling null values
     try {
-      dictionaryHolder.putIfAbsent(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+      dictionaryHolder.putIfAbsent(byteBuffer.array());
     } catch (DictionaryThresholdReachedException e) {
       // do nothing
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9b4962b..5da4c3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -46,6 +46,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -3116,4 +3118,51 @@ public final class CarbonUtil {
       }
     }
   }
+
+  /**
+   * This method prepares a map which will have column and local dictionary generator mapping for
+   * all the local dictionary columns.
+   *
+   * @param carbonTable
+   * carbon Table
+   */
+  public static Map<String, LocalDictionaryGenerator> getLocalDictionaryModel(
+      CarbonTable carbonTable) {
+    List<ColumnSchema> wrapperColumnSchema = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    boolean islocalDictEnabled = carbonTable.isLocalDictionaryEnabled();
+    // creates a map only if local dictionary is enabled, else map will be null
+    Map<String, LocalDictionaryGenerator> columnLocalDictGenMap = new HashMap<>();
+    if (islocalDictEnabled) {
+      int localDictionaryThreshold = carbonTable.getLocalDictionaryThreshold();
+      for (ColumnSchema columnSchema : wrapperColumnSchema) {
+        // check whether the column is local dictionary column or not
+        if (columnSchema.isLocalDictColumn()) {
+          columnLocalDictGenMap.put(columnSchema.getColumnName(),
+              new ColumnLocalDictionaryGenerator(localDictionaryThreshold,
+                  columnSchema.getDataType() == DataTypes.VARCHAR ?
+                      CarbonCommonConstants.INT_SIZE_IN_BYTE :
+                      CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+        }
+      }
+    }
+    if (islocalDictEnabled) {
+      LOGGER.info("Local dictionary is enabled for table: " + carbonTable.getTableUniqueName());
+      LOGGER.info(
+          "Local dictionary threshold for table: " + carbonTable.getTableUniqueName() + " is: "
+              + carbonTable.getLocalDictionaryThreshold());
+      Iterator<Map.Entry<String, LocalDictionaryGenerator>> iterator =
+          columnLocalDictGenMap.entrySet().iterator();
+      StringBuilder stringBuilder = new StringBuilder();
+      while (iterator.hasNext()) {
+        Map.Entry<String, LocalDictionaryGenerator> next = iterator.next();
+        stringBuilder.append(next.getKey());
+        stringBuilder.append(',');
+      }
+      LOGGER.info("Local dictionary will be generated for the columns:" + stringBuilder.toString()
+          + " for table: " + carbonTable.getTableUniqueName());
+    }
+    return columnLocalDictGenMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index e465471..1224674 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -28,10 +29,12 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
@@ -76,9 +79,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private String tableName;
 
+  private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
+
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
+    this.localDictionaryGeneratorMap =
+        CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
   @Override public DataField[] getOutput() {
@@ -160,6 +167,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     DataMapWriterListener listener = getDataMapWriterListener(0);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
         configuration, storeLocation, 0, iteratorIndex, listener);
+    model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 78777ce..9a11266 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -18,12 +18,15 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
@@ -47,9 +50,12 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
 
+  private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
   public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
+    this.localDictionaryGeneratorMap =
+        CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
   @Override public DataField[] getOutput() {
@@ -89,6 +95,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
             DataMapWriterListener listener = getDataMapWriterListener(0);
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
                 .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++, listener);
+            model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
             CarbonFactHandler dataHandler = CarbonFactHandlerFactory
                 .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
             dataHandler.initialise();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index a0f29fa..768dedb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -32,9 +33,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
@@ -60,13 +63,19 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private DataMapWriterListener listener;
 
+  private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
+
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
+    this.localDictionaryGeneratorMap =
+        CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
     super(configuration, null);
+    this.localDictionaryGeneratorMap =
+        CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
   @Override public DataField[] getOutput() {
@@ -92,8 +101,10 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     String[] storeLocation = getStoreLocation(tableIdentifier);
     listener = getDataMapWriterListener(0);
-    return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration,
-        storeLocation, 0, 0, listener);
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
+    carbonFactDataHandlerModel.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
+    return carbonFactDataHandlerModel;
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -169,6 +180,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     listener = getDataMapWriterListener(rangeId);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
         .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0, listener);
+    model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (insideRangeIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/334e6477/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 5b12229..6f8f987 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -287,7 +286,6 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
-    setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel);
     setNumberOfCores(carbonFactDataHandlerModel);
     return carbonFactDataHandlerModel;
   }
@@ -356,9 +354,10 @@ public class CarbonFactDataHandlerModel {
             carbonFactDataHandlerModel.getTaskExtension(),
             String.valueOf(loadModel.getFactTimeStamp()),
             loadModel.getSegmentId()));
-    setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     setNumberOfCores(carbonFactDataHandlerModel);
+    carbonFactDataHandlerModel
+        .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
     return carbonFactDataHandlerModel;
   }
 
@@ -644,48 +643,6 @@ public class CarbonFactDataHandlerModel {
     return columnLocalDictGenMap;
   }
 
-  /**
-   * This method prepares a map which will have column and local dictionary generator mapping for
-   * all the local dictionary columns.
-   * @param carbonTable
-   * @param wrapperColumnSchema
-   * @param carbonFactDataHandlerModel
-   */
-  private static void setLocalDictToModel(CarbonTable carbonTable,
-      List<ColumnSchema> wrapperColumnSchema,
-      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
-    boolean islocalDictEnabled = carbonTable.isLocalDictionaryEnabled();
-    // creates a map only if local dictionary is enabled, else map will be null
-    Map<String, LocalDictionaryGenerator> columnLocalDictGenMap = new HashMap<>();
-    if (islocalDictEnabled) {
-      int localDictionaryThreshold = carbonTable.getLocalDictionaryThreshold();
-      for (ColumnSchema columnSchema : wrapperColumnSchema) {
-        // check whether the column is local dictionary column or not
-        if (columnSchema.isLocalDictColumn()) {
-          columnLocalDictGenMap.put(columnSchema.getColumnName(),
-              new ColumnLocalDictionaryGenerator(localDictionaryThreshold));
-        }
-      }
-    }
-    if (islocalDictEnabled) {
-      LOGGER.info("Local dictionary is enabled for table: " + carbonTable.getTableUniqueName());
-      LOGGER.info(
-          "Local dictionary threshold for table: " + carbonTable.getTableUniqueName() + " is: "
-              + carbonTable.getLocalDictionaryThreshold());
-      Iterator<Map.Entry<String, LocalDictionaryGenerator>> iterator =
-          columnLocalDictGenMap.entrySet().iterator();
-      StringBuilder stringBuilder = new StringBuilder();
-      while (iterator.hasNext()) {
-        Map.Entry<String, LocalDictionaryGenerator> next = iterator.next();
-        stringBuilder.append(next.getKey());
-        stringBuilder.append(',');
-      }
-      LOGGER.info("Local dictionary will be generated for the columns:" + stringBuilder.toString()
-          + " for table: " + carbonTable.getTableUniqueName());
-    }
-    carbonFactDataHandlerModel.setColumnLocalDictGenMap(columnLocalDictGenMap);
-  }
-
   public void setColumnLocalDictGenMap(
       Map<String, LocalDictionaryGenerator> columnLocalDictGenMap) {
     this.columnLocalDictGenMap = columnLocalDictGenMap;