You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/27 09:33:47 UTC

[2/2] carbondata git commit: [CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support

[CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support

What changes are proposed in this PR

Added code to support Local Dictionary Data Loading for primitive type
Added code to support Local Dictionary Data Loading for complex type.
How this PR is tested
Manual testing is done in 3 Node setup.
UT will be raised in different PR

This closes #2402


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e7103397
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e7103397
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e7103397

Branch: refs/heads/master
Commit: e7103397d96623955ad4f55fc1d0cac7c679a8d7
Parents: 5804d75
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon Jun 4 15:41:50 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Jun 27 15:03:31 2018 +0530

----------------------------------------------------------------------
 .../dictionary/DictionaryByteArrayWrapper.java  |   4 +
 .../carbondata/core/datastore/ColumnType.java   |   6 +-
 .../blocklet/BlockletEncodedColumnPage.java     | 176 +++++++++++
 .../datastore/blocklet/EncodedBlocklet.java     | 179 +++++++++++
 ...mpressedDimensionChunkFileBasedReaderV3.java |   3 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |  12 +-
 .../core/datastore/page/ColumnPage.java         |  71 +++--
 .../core/datastore/page/ComplexColumnPage.java  | 149 ++++++---
 .../page/FallbackColumnPageEncoder.java         |  84 +++++
 .../page/FallbackEncodedColumnPage.java         |  49 +++
 .../datastore/page/LocalDictColumnPage.java     | 316 +++++++++++++++++++
 .../datastore/page/SafeVarLengthColumnPage.java |  21 +-
 .../datastore/page/UnsafeDecimalColumnPage.java |  12 +-
 .../page/UnsafeVarLengthColumnPage.java         |  16 +-
 .../datastore/page/VarLengthColumnPageBase.java |  38 +--
 .../page/encoding/ColumnPageEncoder.java        |  40 ++-
 .../page/encoding/EncodedColumnPage.java        |  31 +-
 .../legacy/DictDimensionIndexCodec.java         |   2 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   2 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   8 +-
 .../page/statistics/DummyStatsCollector.java    |  88 ++++++
 .../localdictionary/PageLevelDictionary.java    | 126 ++++++++
 .../dictionaryholder/DictionaryStore.java       |  50 +++
 .../MapBasedDictionaryStore.java                | 137 ++++++++
 .../DictionaryThresholdReachedException.java    |  87 +++++
 .../ColumnLocalDictionaryGenerator.java         |  75 +++++
 .../generator/LocalDictionaryGenerator.java     |  48 +++
 .../core/metadata/schema/table/CarbonTable.java |   2 +-
 .../core/util/CarbonMetadataUtil.java           |  89 ++++--
 .../core/util/CarbonMetadataUtilTest.java       |  65 ----
 format/src/main/thrift/carbondata.thrift        |  12 +
 .../processing/datatypes/ArrayDataType.java     |  15 +-
 .../processing/datatypes/GenericDataType.java   |   4 +-
 .../processing/datatypes/PrimitiveDataType.java |   8 +-
 .../processing/datatypes/StructDataType.java    |  15 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  42 +--
 .../store/CarbonFactDataHandlerModel.java       | 102 +++++-
 .../carbondata/processing/store/TablePage.java  |  38 ++-
 .../store/writer/AbstractFactDataWriter.java    |  13 +
 .../store/writer/v3/BlockletDataHolder.java     |  32 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  88 +++---
 41 files changed, 2001 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
index 03c86ac..f812f86 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
@@ -89,4 +89,8 @@ public class DictionaryByteArrayWrapper {
     result = 31 * result;
     return result;
   }
+
+  public byte[] getData() {
+    return data;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
index 8bbf12d..080444c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
@@ -37,7 +37,9 @@ public enum ColumnType {
 
   COMPLEX_ARRAY,
 
-  COMPLEX_PRIMITIVE;
+  COMPLEX_PRIMITIVE,
+
+  PLAIN_LONG_VALUE;
 
   public static ColumnType valueOf(int ordinal) {
     if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
@@ -56,6 +58,8 @@ public enum ColumnType {
       return COMPLEX_ARRAY;
     } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) {
       return COMPLEX_PRIMITIVE;
+    } else if (ordinal == PLAIN_LONG_VALUE.ordinal()) {
+      return PLAIN_LONG_VALUE;
     } else {
       throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
new file mode 100644
index 0000000..6508787
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.blocklet;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+
+/**
+ * Maintains the list of encoded page of a column in a blocklet
+ * and encoded dictionary values only if column is encoded using local
+ * dictionary
+ * Handle the fallback if all the pages in blocklet are not
+ * encoded with local dictionary
+ */
+public class BlockletEncodedColumnPage {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName());
+
+  /**
+   * list of encoded page of a column in a blocklet
+   */
+  private List<EncodedColumnPage> encodedColumnPageList;
+
+  /**
+   * fallback executor service
+   */
+  private ExecutorService fallbackExecutorService;
+
+  /**
+   * to check whether pages are local dictionary encoded or not
+   */
+  private boolean isLocalDictEncoded;
+
+  /**
+   * page level dictionary only when column is encoded with local dictionary
+   */
+  private PageLevelDictionary pageLevelDictionary;
+
+  /**
+   * fallback future task queue;
+   */
+  private ArrayDeque<Future<FallbackEncodedColumnPage>> fallbackFutureQueue;
+
+  BlockletEncodedColumnPage(ExecutorService fallbackExecutorService) {
+    this.fallbackExecutorService = fallbackExecutorService;
+  }
+
+  /**
+   * Below method will be used to add column page of a column
+   *
+   * @param encodedColumnPage
+   * encoded column page
+   */
+  void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) {
+    if (null == encodedColumnPageList) {
+      this.encodedColumnPageList = new ArrayList<>();
+      // if dimension page is local dictionary enabled and encoded with local dictionary
+      if (encodedColumnPage.isLocalDictGeneratedPage()) {
+        this.isLocalDictEncoded = true;
+        // get first page dictionary
+        this.pageLevelDictionary = encodedColumnPage.getPageDictionary();
+      }
+      encodedColumnPageList.add(encodedColumnPage);
+      return;
+    }
+    // if local dictionary is false or column is encoded with local dictionary then
+    // add a page
+    if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) {
+      this.encodedColumnPageList.add(encodedColumnPage);
+      // merge page level dictionary values
+      if (null != this.pageLevelDictionary) {
+        pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary());
+      }
+    } else {
+      // if older pages were encoded with dictionary and new pages are without dictionary
+      isLocalDictEncoded = false;
+      pageLevelDictionary = null;
+      this.fallbackFutureQueue = new ArrayDeque<>();
+      LOGGER.info(
+          "Local dictionary Fallback is initiated for column: " + encodedColumnPageList.get(0)
+              .getActualPage().getColumnSpec().getFieldName());
+      // submit all the older pages encoded with dictionary for fallback
+      for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) {
+        fallbackFutureQueue.add(fallbackExecutorService.submit(
+            new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex)));
+      }
+      //add to page list
+      this.encodedColumnPageList.add(encodedColumnPage);
+    }
+  }
+
+  /**
+   * Return the list of encoded page list for a column in a blocklet
+   *
+   * @return list of encoded page list
+   */
+  public List<EncodedColumnPage> getEncodedColumnPageList() {
+    // if fallback queue is null then for some pages fallback was initiated
+    if (null != this.fallbackFutureQueue) {
+      try {
+        // check if queue is not empty
+        while (!fallbackFutureQueue.isEmpty()) {
+          // get the head element of queue
+          FallbackEncodedColumnPage fallbackEncodedColumnPage = fallbackFutureQueue.poll().get();
+          // add the encoded column page to list
+          encodedColumnPageList.set(fallbackEncodedColumnPage.getPageIndex(),
+              fallbackEncodedColumnPage.getEncodedColumnPage());
+          fallbackFutureQueue.poll();
+        }
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException("Problem while encoding the blocklet data during fallback", e);
+      }
+      // setting to null as all the fallback encoded page has been added to list
+      fallbackFutureQueue = null;
+    }
+    // in case of dictionary encoded column page memory will be freed only after
+    // all the pages are added in a blocklet, as fallback can happen anytime so old pages memory
+    // cannot be freed, so after encoding is done we can free the page memory
+    if (null != pageLevelDictionary) {
+      // clear the memory footprint for local dictionary encoded pages
+      for (EncodedColumnPage columnPage : encodedColumnPageList) {
+        columnPage.freeMemory();
+      }
+    }
+    return encodedColumnPageList;
+  }
+
+  /**
+   * Below method will be used to get the encoded dictionary
+   * values for local dictionary generated columns
+   *
+   * @return encoded dictionary values if column is local dictionary generated
+   */
+  public LocalDictionaryChunk getEncodedDictionary() {
+    if (null != pageLevelDictionary) {
+      try {
+        return pageLevelDictionary.getLocalDictionaryChunkForBlocklet();
+      } catch (IOException | MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
new file mode 100644
index 0000000..794c439
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.blocklet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+
+/**
+ * Holds the blocklet level data and metadata to be written in carbondata file
+ * For dimension pages it will check if all the pages are not encoded with dictionary
+ * then it will encode those pages for that column again
+ */
+public class EncodedBlocklet {
+
+  /**
+   * number of rows in a blocklet
+   */
+  private int blockletSize;
+
+  /**
+   * list of page metadata
+   */
+  private List<TablePageKey> pageMetadataList;
+
+  /**
+   * maintains encoded dimension data for each column
+   */
+  private List<BlockletEncodedColumnPage> encodedDimensionColumnPages;
+
+  /**
+   * maintains encoded measure data for each column
+   */
+  private List<BlockletEncodedColumnPage> encodedMeasureColumnPages;
+
+  /**
+   * fallback executor service, will used to re-encode column pages
+   */
+  private ExecutorService executorService;
+
+  /**
+   * number of pages in a blocklet
+   */
+  private int numberOfPages;
+
+  public EncodedBlocklet(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * Below method will be used to add page metadata details
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addPageMetadata(EncodedTablePage encodedTablePage) {
+    // for first table page create new list
+    if (null == pageMetadataList) {
+      pageMetadataList = new ArrayList<>();
+    }
+    // update details
+    blockletSize += encodedTablePage.getPageSize();
+    pageMetadataList.add(encodedTablePage.getPageKey());
+    this.numberOfPages++;
+  }
+
+  /**
+   * Below method will be used to add measure column pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addEncodedMeasurePage(EncodedTablePage encodedTablePage) {
+    // for first page create new list
+    if (null == encodedMeasureColumnPages) {
+      encodedMeasureColumnPages = new ArrayList<>();
+      // adding measure pages
+      for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
+        BlockletEncodedColumnPage blockletEncodedColumnPage = new BlockletEncodedColumnPage(null);
+        blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
+        encodedMeasureColumnPages.add(blockletEncodedColumnPage);
+      }
+    } else {
+      for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
+        encodedMeasureColumnPages.get(i).addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to add dimension column pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addEncodedDimensionPage(EncodedTablePage encodedTablePage) {
+    // for first page create new list
+    if (null == encodedDimensionColumnPages) {
+      encodedDimensionColumnPages = new ArrayList<>();
+      // adding measure pages
+      for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
+        BlockletEncodedColumnPage blockletEncodedColumnPage =
+            new BlockletEncodedColumnPage(executorService);
+        blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
+        encodedDimensionColumnPages.add(blockletEncodedColumnPage);
+      }
+    } else {
+      for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
+        encodedDimensionColumnPages.get(i)
+            .addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
+      }
+    }
+  }
+
+  /**
+   * Use to add table pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  public void addEncodedTablePage(EncodedTablePage encodedTablePage) {
+    addPageMetadata(encodedTablePage);
+    addEncodedDimensionPage(encodedTablePage);
+    addEncodedMeasurePage(encodedTablePage);
+  }
+
+  public int getBlockletSize() {
+    return blockletSize;
+  }
+
+  public List<TablePageKey> getPageMetadataList() {
+    return pageMetadataList;
+  }
+
+  public List<BlockletEncodedColumnPage> getEncodedDimensionColumnPages() {
+    return encodedDimensionColumnPages;
+  }
+
+  public List<BlockletEncodedColumnPage> getEncodedMeasureColumnPages() {
+    return encodedMeasureColumnPages;
+  }
+
+  public int getNumberOfDimension() {
+    return encodedDimensionColumnPages.size();
+  }
+
+  public int getNumberOfMeasure() {
+    return encodedMeasureColumnPages.size();
+  }
+
+  public int getNumberOfPages() {
+    return this.numberOfPages;
+  }
+
+  public void clear() {
+    this.numberOfPages = 0;
+    this.encodedDimensionColumnPages = null;
+    this.blockletSize = 0;
+    this.encodedMeasureColumnPages = null;
+    this.pageMetadataList = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 782a8df..fee114d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -242,7 +242,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
   }
 
   private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) {
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
+      MemoryException {
     byte[] dataPage;
     int[] rlePage;
     int[] invertedIndexes = new int[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 911a260..99a7e57 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -37,11 +37,15 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   private byte[] max;
 
   public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage,
-      boolean isNoDictonary) {
+      boolean isNoDictonary, boolean isVarchar) {
     this.dataPage = dataPage;
     min = this.dataPage[0];
     max = this.dataPage[0];
     totalSize += this.dataPage[0].length;
+    int lVFormatLength = 2;
+    if (isVarchar) {
+      lVFormatLength = 4;
+    }
     int minCompare = 0;
     int maxCompare = 0;
     if (!isNoDictonary) {
@@ -60,9 +64,11 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
       for (int i = 1; i < this.dataPage.length; i++) {
         totalSize += this.dataPage[i].length;
         minCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(min, 2, min.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2);
+            .compareTo(min, lVFormatLength, min.length - lVFormatLength, this.dataPage[i],
+                lVFormatLength, this.dataPage[i].length - lVFormatLength);
         maxCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(max, 2, max.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2);
+            .compareTo(max, lVFormatLength, max.length - lVFormatLength, this.dataPage[i],
+                lVFormatLength, this.dataPage[i].length - lVFormatLength);
         if (minCompare > 0) {
           min = this.dataPage[i];
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 4dcf514..4ff1330 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -59,7 +61,7 @@ public abstract class ColumnPage {
   private BitSet nullBitSet;
 
   // statistics collector for this column page
-  private ColumnPageStatsCollector statsCollector;
+  protected ColumnPageStatsCollector statsCollector;
 
   protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
@@ -79,32 +81,8 @@ public abstract class ColumnPage {
     return dataType;
   }
 
-  private static final SimpleStatsResult statsForComplexType = new SimpleStatsResult() {
-    @Override public Object getMin() {
-      return new byte[0];
-    }
-
-    @Override public Object getMax() {
-      return new byte[0];
-    }
-
-    @Override public int getDecimalCount() {
-      return 0;
-    }
-
-    @Override public DataType getDataType() {
-      return BYTE_ARRAY;
-    }
-
-  };
-
   public SimpleStatsResult getStatistics() {
-    if (statsCollector != null) {
-      return statsCollector.getPageStats();
-    } else {
-      // TODO: for sub column of complex type, there no stats yet, return a dummy result
-      return statsForComplexType;
-    }
+    return statsCollector.getPageStats();
   }
 
   public int getPageSize() {
@@ -184,6 +162,19 @@ public abstract class ColumnPage {
     return newPage(columnSpec, dataType, pageSize);
   }
 
+  public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize, LocalDictionaryGenerator localDictionaryGenerator) throws MemoryException {
+    if (unsafe) {
+      return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize),
+          new UnsafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
+          localDictionaryGenerator);
+    } else {
+      return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize),
+          new SafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
+          localDictionaryGenerator);
+    }
+  }
+
   /**
    * Create a new page of dataType and number of row = pageSize
    */
@@ -675,6 +666,9 @@ public abstract class ColumnPage {
    */
   public abstract void convertValue(ColumnPageValueConverter codec);
 
+  public PageLevelDictionary getPageDictionary() {
+    throw new UnsupportedOperationException("Operation Not Supported");
+  }
   /**
    * Compress page data using specified compressor
    */
@@ -702,7 +696,9 @@ public abstract class ColumnPage {
       return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY && (
         columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT
-            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) {
+            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY
+            || columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE
+            || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       return compressor.compressByte(getComplexParentFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
@@ -742,8 +738,9 @@ public abstract class ColumnPage {
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
       return newDoublePage(columnSpec, doubleData);
-    } else if (storeDataType == DataTypes.BYTE_ARRAY
-        && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+    } else if (storeDataType == DataTypes.BYTE_ARRAY && (
+        columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
+            || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newComplexLVBytesPage(columnSpec, lvVarBytes,
           CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
@@ -756,6 +753,10 @@ public abstract class ColumnPage {
         && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
     } else if (storeDataType == DataTypes.BYTE_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
@@ -816,4 +817,16 @@ public abstract class ColumnPage {
   public TableSpec.ColumnSpec getColumnSpec() {
     return columnSpec;
   }
+
+  public boolean isLocalDictGeneratedPage() {
+    return false;
+  }
+
+  public void disableLocalDictEncoding() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  public PageLevelDictionary getColumnPageDictionary() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 07dc837..c6b650f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -17,74 +17,129 @@
 
 package org.apache.carbondata.core.datastore.page;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.ColumnType;
-
-// Represent a complex column page, e.g. Array, Struct type column
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+/**
+ * holds the complex columndata and its children data
+ */
 public class ComplexColumnPage {
 
-  // Holds data for all rows in this page in columnar layout.
-  // After the complex data expand, it is of type byte[][], the first level array in the byte[][]
-  // representing a sub-column in the complex type, which can be retrieved by giving the depth
-  // of the complex type.
-  // TODO: further optimize it to make it more memory efficient
-  private List<ArrayList<byte[]>> complexColumnData;
-
-  // depth is the number of column after complex type is expanded. It is from 1 to N
-  private final int pageSize;
-
+  /**
+   * number of columns
+   */
   private int depth;
 
+  /**
+   * type of each column
+   */
   private List<ColumnType> complexColumnType;
 
-  public ComplexColumnPage(int pageSize, List<ColumnType> complexColumnType) {
-    this.pageSize = pageSize;
+  /**
+   * column page for each type
+   */
+  private ColumnPage[] columnPages;
+
+  /**
+   * to maintain the number of record added for each type
+   */
+  private int[] currentRowIdList;
+
+  public ComplexColumnPage(List<ColumnType> complexColumnType) {
     this.depth = complexColumnType.size();
-    complexColumnData = new ArrayList<>(depth);
-    for (int i = 0; i < depth; i++) {
-      complexColumnData.add(new ArrayList<byte[]>());
-    }
     this.complexColumnType = complexColumnType;
+    this.columnPages = new ColumnPage[this.depth];
+    this.currentRowIdList = new int[depth];
   }
 
-  public void putComplexData(int rowId, int depth, List<byte[]> value) {
-    assert (depth <= this.depth);
-    ArrayList<byte[]> subColumnPage = complexColumnData.get(depth);
-    subColumnPage.addAll(value);
-  }
-
-  // iterate on the sub-column after complex type is expanded, return columnar page of
-  // each sub-column
-  public Iterator<byte[][]> iterator() {
-
-    return new CarbonIterator<byte[][]>() {
-      private int index = 0;
-      @Override public boolean hasNext() {
-        return index < depth;
-      }
-
-      @Override public byte[][] next() {
-        // convert the subColumnPage from ArrayList<byte[]> to byte[][]
-        ArrayList<byte[]> subColumnPage = complexColumnData.get(index);
-        index++;
-        return subColumnPage.toArray(new byte[subColumnPage.size()][]);
+  /**
+   * below method will be used to initlize the column page of complex type
+   * @param columnToDictMap
+   * dictionary map
+   * @param columnNames
+   * list of columns
+   * @param pageSize
+   * number of records
+   * @throws MemoryException
+   * if memory is not sufficient
+   */
+  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap,
+      List<String> columnNames, int pageSize) throws MemoryException {
+    for (int i = 0; i < this.columnPages.length; i++) {
+      LocalDictionaryGenerator localDictionaryGenerator = columnToDictMap.get(columnNames.get(i));
+      if (null == localDictionaryGenerator) {
+        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
+        this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+        this.columnPages[i].setStatsCollector(new DummyStatsCollector());
+      } else {
+        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
+        this.columnPages[i] = ColumnPage
+            .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator);
+        this.columnPages[i].setStatsCollector(new DummyStatsCollector());
       }
-    };
+    }
   }
 
+  /**
+   *
+   * @return depth
+   */
   public int getDepth() {
     return depth;
   }
 
-  public int getPageSize() {
-    return pageSize;
-  }
-
+  /**
+   * return the type of complex column
+   * @param isDepth
+   * @return co plex column type
+   */
   public ColumnType getComplexColumnType(int isDepth) {
     return complexColumnType.get(isDepth);
   }
+
+  /**
+   * method to add complex column data
+   * @param depth
+   * depth of column
+   * @param dataList
+   * dataList
+   */
+  public void putComplexData(int depth, List<byte[]> dataList) {
+    assert (depth <= this.depth);
+    int currentNumber = currentRowIdList[depth];
+    for (int i = 0; i < dataList.size(); i++) {
+      columnPages[depth].putData(currentNumber, dataList.get(i));
+      currentNumber++;
+    }
+    currentRowIdList[depth] = currentNumber;
+  }
+
+  /**
+   * to free the used memory
+   */
+  public void freeMemory() {
+    for (int i = 0; i < depth; i++) {
+      columnPages[i].freeMemory();
+    }
+  }
+
+  /**
+   * return the column page
+   * @param depth
+   * depth of column
+   * @return colum page
+   */
+  public ColumnPage getColumnPage(int depth) {
+    assert (depth <= this.depth);
+    return columnPages[depth];
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
new file mode 100644
index 0000000..32846a1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+
+/**
+ * Below class will be used to encode column pages for which local dictionary was generated
+ * but all the pages in blocklet was not encoded with local dictionary.
+ * This is required as all the pages of a column in blocklet either it will be local dictionary
+ * encoded or without local dictionary encoded.
+ */
+public class FallbackColumnPageEncoder implements Callable<FallbackEncodedColumnPage> {
+
+  /**
+   * actual local dictionary generated column page
+   */
+  private EncodedColumnPage encodedColumnPage;
+
+  /**
+   * actual index in the page
+   * this is required as in a blocklet few pages will be local dictionary
+   * encoded and few pages will be plain text encoding
+   * in this case local dictionary encoded page
+   */
+  private int pageIndex;
+
+  public FallbackColumnPageEncoder(EncodedColumnPage encodedColumnPage, int pageIndex) {
+    this.encodedColumnPage = encodedColumnPage;
+    this.pageIndex = pageIndex;
+  }
+
+  @Override public FallbackEncodedColumnPage call() throws Exception {
+    // disable encoding using local dictionary
+    encodedColumnPage.getActualPage().disableLocalDictEncoding();
+    // new encoded column page
+    EncodedColumnPage newEncodedColumnPage;
+
+    // get column spec for existing column page
+    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
+    switch (columnSpec.getColumnType()) {
+      case COMPLEX_ARRAY:
+      case COMPLEX_PRIMITIVE:
+      case COMPLEX_STRUCT:
+      case COMPLEX:
+        // for complex type column
+        newEncodedColumnPage = ColumnPageEncoder.encodedColumn(
+            encodedColumnPage.getActualPage());
+        break;
+      default:
+        // for primitive column
+        ColumnPageEncoder columnPageEncoder = DefaultEncodingFactory.getInstance()
+            .createEncoder(encodedColumnPage.getActualPage().getColumnSpec(),
+                encodedColumnPage.getActualPage());
+        newEncodedColumnPage = columnPageEncoder.encode(encodedColumnPage.getActualPage());
+    }
+    FallbackEncodedColumnPage fallbackEncodedColumnPage =
+        new FallbackEncodedColumnPage(newEncodedColumnPage, pageIndex);
+    // here freeing the memory of raw column page as fallback is done and column page will not
+    // be used.
+    // This is required to free the memory once it is of no use
+    encodedColumnPage.freeMemory();
+    return fallbackEncodedColumnPage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
new file mode 100644
index 0000000..9ce87b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+
+/**
+ * Maintains the fallback encoded page and metadata
+ */
+public class FallbackEncodedColumnPage {
+
+  /**
+   * encode page
+   */
+  private EncodedColumnPage encodedColumnPage;
+
+  /**
+   * page index in a blocklet
+   */
+  private int pageIndex;
+
+  public FallbackEncodedColumnPage(EncodedColumnPage encodedColumnPage, int pageIndex) {
+    this.encodedColumnPage = encodedColumnPage;
+    this.pageIndex = pageIndex;
+  }
+
+  public EncodedColumnPage getEncodedColumnPage() {
+    return encodedColumnPage;
+  }
+
+  public int getPageIndex() {
+    return pageIndex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
new file mode 100644
index 0000000..2c7d3a7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Column page implementation for Local dictionary generated columns
+ * Its a decorator over two column page
+ * 1. Which will hold the actual data
+ * 2. Which will hold the dictionary encoded data
+ */
+public class LocalDictColumnPage extends ColumnPage {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalDictColumnPage.class.getName());
+
+  /**
+   * to maintain page level dictionary for column page
+   */
+  private PageLevelDictionary pageLevelDictionary;
+
+  /**
+   * to hold the actual data of the column
+   */
+  private ColumnPage actualDataColumnPage;
+
+  /**
+   * to hold the dictionary encoded column page
+   */
+  private ColumnPage encodedDataColumnPage;
+
+  /**
+   * to check if actual column page memory is already clear
+   */
+  private boolean isActualPageMemoryFreed;
+
+  /**
+   * Create a new column page with input data type and page size.
+   */
+  protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage,
+      LocalDictionaryGenerator localDictionaryGenerator) {
+    super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(),
+        actualDataColumnPage.getPageSize());
+    // if threshold is not reached then create page level dictionary
+    // for encoding with local dictionary
+    if (!localDictionaryGenerator.isThresholdReached()) {
+      pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator,
+          actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType());
+      this.encodedDataColumnPage = encodedColumnpage;
+    } else {
+      // else free the encoded column page memory as its of no use
+      encodedColumnpage.freeMemory();
+    }
+    this.actualDataColumnPage = actualDataColumnPage;
+  }
+
+  @Override public byte[][] getByteArrayPage() {
+    if (null != pageLevelDictionary) {
+      return encodedDataColumnPage.getByteArrayPage();
+    } else {
+      return actualDataColumnPage.getByteArrayPage();
+    }
+  }
+
+  /**
+   * Below method will be used to check whether page is local dictionary
+   * generated or not. This will be used for while enoding the the page
+   *
+   * @return
+   */
+  public boolean isLocalDictGeneratedPage() {
+    return null != pageLevelDictionary;
+  }
+
+  /**
+   * Below method will be used to add column data to page
+   *
+   * @param rowId row number
+   * @param bytes actual data
+   */
+  @Override public void putBytes(int rowId, byte[] bytes) {
+    if (null != pageLevelDictionary) {
+      try {
+        actualDataColumnPage.putBytes(rowId, bytes);
+        int dictionaryValue = pageLevelDictionary.getDictionaryValue(bytes);
+        encodedDataColumnPage.putBytes(rowId, ByteUtil.toBytes(dictionaryValue));
+      } catch (DictionaryThresholdReachedException e) {
+        LOGGER.error(e, "Local Dictionary threshold reached for the column: " + actualDataColumnPage
+            .getColumnSpec().getFieldName());
+        pageLevelDictionary = null;
+        encodedDataColumnPage.freeMemory();
+        encodedDataColumnPage = null;
+      }
+    } else {
+      actualDataColumnPage.putBytes(rowId, bytes);
+    }
+  }
+
+  @Override public void disableLocalDictEncoding() {
+    pageLevelDictionary = null;
+    freeEncodedColumnPage();
+  }
+
+  @Override public PageLevelDictionary getColumnPageDictionary() {
+    return pageLevelDictionary;
+  }
+
+  @Override public void setBytePage(byte[] byteData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setShortPage(short[] shortData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setShortIntPage(byte[] shortIntData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setIntPage(int[] intData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setLongPage(long[] longData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void freeMemory() {
+    if (null == pageLevelDictionary) {
+      actualDataColumnPage.freeMemory();
+      isActualPageMemoryFreed = true;
+    }
+  }
+
+  public void freeMemoryForce() {
+    if (!isActualPageMemoryFreed) {
+      actualDataColumnPage.freeMemory();
+      isActualPageMemoryFreed = true;
+    }
+    freeEncodedColumnPage();
+  }
+
+  private void freeEncodedColumnPage() {
+    if (null != encodedDataColumnPage) {
+      encodedDataColumnPage.freeMemory();
+      encodedDataColumnPage = null;
+    }
+  }
+
+  @Override public void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putShortInt(int rowId, int value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte getByte(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public short getShort(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int getShortInt(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int getInt(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public long getLong(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public float getFloat(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public double getDouble(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public BigDecimal getDecimal(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getBytes(int rowId) {
+    return actualDataColumnPage.getBytes(rowId);
+  }
+
+  @Override public byte[] getBytePage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public short[] getShortPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getShortIntPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int[] getIntPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public long[] getLongPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public float[] getFloatPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public double[] getDoublePage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getLVFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getLVFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getLVFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getComplexParentFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getComplexParentFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getComplexParentFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void convertValue(ColumnPageValueConverter codec) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 7b1ad20..c2eb40c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -21,6 +21,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -28,11 +30,11 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   // for string and decimal data
-  private byte[][] byteArrayData;
+  private List<byte[]> byteArrayData;
 
   SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
-    byteArrayData = new byte[pageSize][];
+    byteArrayData = new ArrayList<>();
   }
 
   @Override
@@ -42,13 +44,12 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public void putBytesAtRow(int rowId, byte[] bytes) {
-    byteArrayData[rowId] = bytes;
+    byteArrayData.add(bytes);
   }
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    byteArrayData[rowId] = new byte[length];
-    System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+    byteArrayData.add(bytes);
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
@@ -62,12 +63,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[] getBytes(int rowId) {
-    return byteArrayData[rowId];
+    return byteArrayData.get(rowId);
   }
 
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    byteArrayData = byteArray;
+    for (byte[] data : byteArray) {
+      byteArrayData.add(data);
+    }
   }
 
   @Override
@@ -104,12 +107,12 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[][] getByteArrayPage() {
-    return byteArrayData;
+    return byteArrayData.toArray(new byte[byteArrayData.size()][]);
   }
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+    System.arraycopy(byteArrayData.get(rowId), 0, dest, destOffset, length);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 1cdefc8..7449da6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -168,7 +168,7 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
       throw new RuntimeException(e);
     }
     CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, baseAddress,
-        baseOffset + rowOffset[rowId], length);
+        baseOffset + rowOffset.get(rowId), length);
   }
 
   @Override
@@ -193,9 +193,9 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
   @Override
   public byte[] getBytes(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
     byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
     return bytes;
   }
@@ -242,9 +242,9 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
     } else if (dataType == DataTypes.LONG) {
       value = getLong(rowId);
     } else {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes,
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), bytes,
           CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
       return decimalConverter.getDecimal(bytes);
     }
@@ -253,7 +253,7 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], dest,
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), dest,
         CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 9d6e161..f60e505 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -65,7 +65,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
       throw new RuntimeException(e);
     }
     CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
-        baseAddress, baseOffset + rowOffset[rowId], length);
+        baseAddress, baseOffset + rowOffset.get(rowId), length);
   }
 
   @Override
@@ -89,20 +89,20 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[] getBytes(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
     byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
     return bytes;
   }
 
   @Override
   public byte[][] getByteArrayPage() {
-    byte[][] bytes = new byte[pageSize][];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[][] bytes = new byte[rowOffset.size() - 1][];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       byte[] rowData = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
           rowData, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
       bytes[rowId] = rowData;
     }
@@ -111,7 +111,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index cb907a5..bd49b94 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -53,7 +53,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   Object baseAddress;
 
   // the offset of row in the unsafe memory, its size is pageSize + 1
-  int[] rowOffset;
+  List<Integer> rowOffset;
 
   // the length of bytes added in the page
   int totalLength;
@@ -66,7 +66,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
-    rowOffset = new int[pageSize + 1];
+    rowOffset = new ArrayList<>();
     totalLength = 0;
   }
 
@@ -160,9 +160,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
-    page.rowOffset = new int[rowId + 1];
-    for (int i = 0; i < rowId + 1; i++) {
-      page.rowOffset[i] = rowOffset.get(i);
+    page.rowOffset = new ArrayList<>();
+    for (int i = 0; i < rowOffset.size(); i++) {
+      page.rowOffset.add(rowOffset.get(i));
     }
     for (int i = 0; i < rowId; i++) {
       page.putBytes(i, lvEncodedBytes, i * size, size);
@@ -240,9 +240,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
-    page.rowOffset = new int[rowId + 1];
-    for (int i = 0; i < rowId + 1; i++) {
-      page.rowOffset[i] = rowOffset.get(i);
+    page.rowOffset = new ArrayList<>();
+    for (int i = 0; i < rowOffset.size(); i++) {
+      page.rowOffset.add(rowOffset.get(i));
     }
 
     // set data in page
@@ -296,9 +296,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
           + " exceed this limit at rowId " + rowId);
     }
     if (rowId == 0) {
-      rowOffset[0] = 0;
+      rowOffset.add(0);
     }
-    rowOffset[rowId + 1] = rowOffset[rowId] + bytes.length;
+    rowOffset.add(rowOffset.get(rowId) + bytes.length);
     putBytesAtRow(rowId, bytes);
     totalLength += bytes.length;
   }
@@ -379,7 +379,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     int offset = 0;
     byte[] data = new byte[totalLength];
     for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       copyBytes(rowId, data, offset, length);
       offset += length;
     }
@@ -395,9 +395,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   public byte[] getLVFlattenedBytePage() throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + pageSize * 4];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 4)];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       ByteUtil.setInt(data, offset, length);
       copyBytes(rowId, data, offset + 4, length);
       offset += 4 + length;
@@ -408,9 +408,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + pageSize * 2];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+    byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 2)];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId));
       ByteUtil.setShort(data, offset, length);
       copyBytes(rowId, data, offset + 2, length);
       offset += 2 + length;
@@ -423,8 +423,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     // output LV encoded byte array
     int offset = 0;
     byte[] data = new byte[totalLength];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId));
       copyBytes(rowId, data, offset, length);
       offset += length;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 8bff5cc..f53024a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -22,11 +22,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.ColumnType;
-import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -39,6 +36,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+import org.apache.carbondata.format.LocalDictionaryChunkMeta;
 import org.apache.carbondata.format.PresenceMeta;
 
 public abstract class ColumnPageEncoder {
@@ -56,7 +55,7 @@ public abstract class ColumnPageEncoder {
   public EncodedColumnPage encode(ColumnPage inputPage) throws IOException, MemoryException {
     byte[] encodedBytes = encodeData(inputPage);
     DataChunk2 pageMetadata = buildPageMetadata(inputPage, encodedBytes);
-    return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage.getStatistics());
+    return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage);
   }
 
   private DataChunk2 buildPageMetadata(ColumnPage inputPage, byte[] encodedBytes)
@@ -138,22 +137,39 @@ public abstract class ColumnPageEncoder {
       throws IOException, MemoryException {
     EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()];
     int index = 0;
-    Iterator<byte[][]> iterator = input.iterator();
-    while (iterator.hasNext()) {
-      byte[][] subColumnPage = iterator.next();
-      encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index));
+    while (index < input.getDepth()) {
+      ColumnPage subColumnPage = input.getColumnPage(index);
+      encodedPages[index] = encodedColumn(subColumnPage);
       index++;
     }
     return encodedPages;
   }
 
-  private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType)
+  public static EncodedColumnPage encodedColumn(ColumnPage page)
       throws IOException, MemoryException {
-    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-        .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType);
-    ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
     ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
     return encoder.encode(page);
   }
 
+  /**
+   * Below method to encode the dictionary page
+   * @param dictionaryPage
+   * dictionary column page
+   * @return local dictionary chunk
+   * @throws IOException
+   * Problem in encoding
+   * @throws MemoryException
+   * problem in encoding
+   */
+  public LocalDictionaryChunk encodeDictionary(ColumnPage dictionaryPage)
+      throws IOException, MemoryException {
+    LocalDictionaryChunk localDictionaryChunk = new LocalDictionaryChunk();
+    localDictionaryChunk.setDictionary_data(encodeData(dictionaryPage));
+    LocalDictionaryChunkMeta localDictionaryChunkMeta = new LocalDictionaryChunkMeta();
+    localDictionaryChunkMeta.setEncoders(getEncodingList());
+    localDictionaryChunkMeta.setEncoder_meta(buildEncoderMeta(dictionaryPage));
+    localDictionaryChunk.setDictionary_meta(localDictionaryChunkMeta);
+    return localDictionaryChunk;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
index 43d6fc6..6f78d95 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.nio.ByteBuffer;
 
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LocalDictColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 
@@ -34,8 +37,7 @@ public class EncodedColumnPage {
   // metadata of this page
   private DataChunk2 pageMetadata;
 
-  // stats of this page
-  private SimpleStatsResult stats;
+  private ColumnPage actualPage;
 
   /**
    * Constructor
@@ -43,7 +45,7 @@ public class EncodedColumnPage {
    * @param encodedData encoded data for this page
    */
   public EncodedColumnPage(DataChunk2 pageMetadata, byte[] encodedData,
-      SimpleStatsResult stats) {
+      ColumnPage actualPage) {
     if (pageMetadata == null) {
       throw new IllegalArgumentException("data chunk2 must not be null");
     }
@@ -52,7 +54,7 @@ public class EncodedColumnPage {
     }
     this.pageMetadata = pageMetadata;
     this.encodedData = encodedData;
-    this.stats = stats;
+    this.actualPage = actualPage;
   }
 
   /**
@@ -76,6 +78,25 @@ public class EncodedColumnPage {
   }
 
   public SimpleStatsResult getStats() {
-    return stats;
+    return actualPage.getStatistics();
+  }
+
+  public ColumnPage getActualPage() {
+    return actualPage;
+  }
+
+  public boolean isLocalDictGeneratedPage() {
+    return actualPage.isLocalDictGeneratedPage();
+  }
+
+  public PageLevelDictionary getPageDictionary() {
+    return actualPage.getColumnPageDictionary();
+  }
+
+  public void freeMemory() {
+    if (actualPage instanceof LocalDictColumnPage) {
+      LocalDictColumnPage page = (LocalDictColumnPage) actualPage;
+      page.freeMemoryForce();
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index d157654..5694817 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -51,7 +51,7 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 1e5015b..17a523c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -52,7 +52,7 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index 741dbfe..c68f394 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -55,10 +55,12 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
       protected void encodeIndexStorage(ColumnPage input) {
         IndexStorage indexStorage;
         byte[][] data = input.getByteArrayPage();
+        boolean isDictionary = input.isLocalDictGeneratedPage();
         if (isInvertedIndex) {
-          indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
+          indexStorage = new BlockIndexerStorageForShort(data, isDictionary, !isDictionary, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+          indexStorage =
+              new BlockIndexerStorageForNoInvertedIndexForShort(data, !isDictionary, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);
@@ -75,8 +77,6 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
         }
         return encodings;
       }
-
     };
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
new file mode 100644
index 0000000..a8bc5f1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
+
+/**
+ * Column Page dummy stats collector. This will be used for which stats generation
+ * is not required for example complex type column
+ */
+public class DummyStatsCollector implements ColumnPageStatsCollector {
+
+  /**
+   * dummy stats used to sync with encoder
+   */
+  protected static final SimpleStatsResult DUMMY_STATS = new SimpleStatsResult() {
+    @Override public Object getMin() {
+      return new byte[0];
+    }
+
+    @Override public Object getMax() {
+      return new byte[0];
+    }
+
+    @Override public int getDecimalCount() {
+      return 0;
+    }
+
+    @Override public DataType getDataType() {
+      return BYTE_ARRAY;
+    }
+
+  };
+
+  @Override public void updateNull(int rowId) {
+
+  }
+
+  @Override public void update(byte value) {
+
+  }
+
+  @Override public void update(short value) {
+
+  }
+
+  @Override public void update(int value) {
+
+  }
+
+  @Override public void update(long value) {
+
+  }
+
+  @Override public void update(double value) {
+
+  }
+
+  @Override public void update(BigDecimal value) {
+
+  }
+
+  @Override public void update(byte[] value) {
+
+  }
+
+  @Override public SimpleStatsResult getPageStats() {
+    return DUMMY_STATS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
new file mode 100644
index 0000000..3ea36ef
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+
+/**
+ * Class to maintain page level dictionary. It will store all unique dictionary values
+ * used in a page. This is required while writing blocklet level dictionary in carbondata
+ * file
+ */
+public class PageLevelDictionary {
+
+  /**
+   * dictionary generator to generate dictionary values for page data
+   */
+  private LocalDictionaryGenerator localDictionaryGenerator;
+
+  /**
+   * set of dictionary surrogate key in this page
+   */
+  private BitSet usedDictionaryValues;
+
+  private String columnName;
+
+  private DataType dataType;
+
+  public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName,
+      DataType dataType) {
+    this.localDictionaryGenerator = localDictionaryGenerator;
+    this.usedDictionaryValues = new BitSet();
+    this.columnName = columnName;
+    this.dataType = dataType;
+  }
+
+  /**
+   * Below method will be used to get the dictionary value
+   *
+   * @param data column data
+   * @return dictionary value
+   * @throws DictionaryThresholdReachedException when threshold crossed for column
+   */
+  public int getDictionaryValue(byte[] data) throws DictionaryThresholdReachedException {
+    int dictionaryValue = localDictionaryGenerator.generateDictionary(data);
+    this.usedDictionaryValues.set(dictionaryValue);
+    return dictionaryValue;
+  }
+
+  /**
+   * Method to merge the dictionary value across pages
+   *
+   * @param pageLevelDictionary other page level dictionary
+   */
+  public void mergerDictionaryValues(PageLevelDictionary pageLevelDictionary) {
+    usedDictionaryValues.and(pageLevelDictionary.usedDictionaryValues);
+  }
+
+  /**
+   * Below method will be used to get the local dictionary chunk for writing
+   * @TODO Support for numeric data type dictionary exclude columns
+   * @return encoded local dictionary chunk
+   * @throws MemoryException
+   * in case of problem in encoding
+   * @throws IOException
+   * in case of problem in encoding
+   */
+  public LocalDictionaryChunk getLocalDictionaryChunkForBlocklet()
+      throws MemoryException, IOException {
+    // TODO support for actual data type dictionary ColumnSPEC
+    ColumnType columnType = ColumnType.PLAIN_VALUE;
+    if (DataTypes.VARCHAR == dataType) {
+      columnType = ColumnType.PLAIN_LONG_VALUE;
+    }
+    TableSpec.ColumnSpec spec =
+        TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType);
+    ColumnPage dictionaryColumnPage =
+        ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, usedDictionaryValues.cardinality());
+    // TODO support data type specific stats collector for numeric data types
+    dictionaryColumnPage.setStatsCollector(new DummyStatsCollector());
+    int rowId = 0;
+    for (int i = usedDictionaryValues.nextSetBit(0);
+         i >= 0; i = usedDictionaryValues.nextSetBit(i + 1)) {
+      dictionaryColumnPage
+          .putData(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(i));
+    }
+    // creating a encoder
+    ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
+    // get encoded dictionary values
+    LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage);
+    // set compressed dictionary values
+    localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor()
+        .compressByte(usedDictionaryValues.toByteArray()));
+    // free the dictionary page memory
+    dictionaryColumnPage.freeMemory();
+    return localDictionaryChunk;
+  }
+}