You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/27 09:33:46 UTC

[1/2] carbondata git commit: [CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support

Repository: carbondata
Updated Branches:
  refs/heads/master 5804d7570 -> e7103397d


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java
new file mode 100644
index 0000000..226104b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary.dictionaryholder;
+
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+
+/**
+ * Interface for storing the dictionary key and value.
+ * Concrete implementation can be of map based or trie based.
+ */
+public interface DictionaryStore {
+
+  /**
+   * Below method will be used to add dictionary value to dictionary holder
+   * if it is already present in the holder then it will return exiting dictionary value.
+   * @param key
+   * dictionary key
+   * @return dictionary value
+   */
+  int putIfAbsent(byte[] key) throws DictionaryThresholdReachedException;
+
+  /**
+   * Below method to get the current size of dictionary
+   * @return true if threshold of store reached
+   */
+  boolean isThresholdReached();
+
+  /**
+   * Below method will be used to get the dictionary key based on value
+   * @param value
+   * dictionary value
+   * @return dictionary key based on value
+   */
+  byte[] getDictionaryKeyBasedOnValue(int value);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java
new file mode 100644
index 0000000..05ca002
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary.dictionaryholder;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.cache.dictionary.DictionaryByteArrayWrapper;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+
+/**
+ * Map based dictionary holder class, it will use map to hold
+ * the dictionary key and its value
+ */
+public class MapBasedDictionaryStore implements DictionaryStore {
+
+  /**
+   * use to assign dictionary value to new key
+   */
+  private int lastAssignValue;
+
+  /**
+   * to maintain dictionary key value
+   */
+  private final Map<DictionaryByteArrayWrapper, Integer> dictionary;
+
+  /**
+   * maintaining array for reverse lookup
+   * otherwise iterating everytime in map for reverse lookup will be slowdown the performance
+   * It will only maintain the reference
+   */
+  private DictionaryByteArrayWrapper[] referenceDictionaryArray;
+
+  /**
+   * dictionary threshold to check if threshold is reached
+   */
+  private int dictionaryThreshold;
+
+  /**
+   * for checking threshold is reached or not
+   */
+  private boolean isThresholdReached;
+
+  public MapBasedDictionaryStore(int dictionaryThreshold) {
+    this.dictionaryThreshold = dictionaryThreshold;
+    this.dictionary = new ConcurrentHashMap<>();
+    this.referenceDictionaryArray = new DictionaryByteArrayWrapper[dictionaryThreshold];
+  }
+
+  /**
+   * Below method will be used to add dictionary value to dictionary holder
+   * if it is already present in the holder then it will return exiting dictionary value.
+   *
+   * @param data dictionary key
+   * @return dictionary value
+   */
+  @Override public int putIfAbsent(byte[] data) throws DictionaryThresholdReachedException {
+    // check if threshold has already reached
+    checkIfThresholdReached();
+    DictionaryByteArrayWrapper key = new DictionaryByteArrayWrapper(data);
+    // get the dictionary value
+    Integer value = dictionary.get(key);
+    // if value is null then dictionary is not present in store
+    if (null == value) {
+      // aquire the lock
+      synchronized (dictionary) {
+        // check threshold
+        checkIfThresholdReached();
+        // get the value again as other thread might have added
+        value = dictionary.get(key);
+        // double chekcing
+        if (null == value) {
+          // increment the value
+          value = ++lastAssignValue;
+          // if new value is greater than threshold
+          if (value > dictionaryThreshold) {
+            // clear the dictionary
+            dictionary.clear();
+            referenceDictionaryArray = null;
+            // set the threshold boolean to true
+            isThresholdReached = true;
+            // throw exception
+            checkIfThresholdReached();
+          }
+          // add to reference array
+          // position is -1 as dictionary value starts from 1
+          this.referenceDictionaryArray[value - 1] = key;
+          dictionary.put(key, value);
+        }
+      }
+    }
+    return value;
+  }
+
+  private void checkIfThresholdReached() throws DictionaryThresholdReachedException {
+    if (isThresholdReached) {
+      throw new DictionaryThresholdReachedException(
+          "Unable to generate dictionary value. Dictionary threshold reached");
+    }
+  }
+
+  /**
+   * Below method to get the current size of dictionary
+   *
+   * @return
+   */
+  @Override public boolean isThresholdReached() {
+    return isThresholdReached;
+  }
+
+  /**
+   * Below method will be used to get the dictionary key based on value
+   *
+   * @param value dictionary value
+   *              Caller will take of passing proper value
+   * @return dictionary key based on value
+   */
+  @Override public byte[] getDictionaryKeyBasedOnValue(int value) {
+    assert referenceDictionaryArray != null;
+    // reference array index will be -1 of the value as dictionary value starts from 1
+    return referenceDictionaryArray[value - 1].getData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java
new file mode 100644
index 0000000..7d648e0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary.exception;
+
+import java.util.Locale;
+
+public class DictionaryThresholdReachedException extends Exception {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public DictionaryThresholdReachedException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg       exception message
+   * @param throwable detail exception
+   */
+  public DictionaryThresholdReachedException(String msg, Throwable throwable) {
+    super(msg, throwable);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param throwable exception
+   */
+  public DictionaryThresholdReachedException(Throwable throwable) {
+    super(throwable);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
new file mode 100644
index 0000000..5ae9e27
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary.generator;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.localdictionary.dictionaryholder.DictionaryStore;
+import org.apache.carbondata.core.localdictionary.dictionaryholder.MapBasedDictionaryStore;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+
+/**
+ * Class to generate local dictionary for column
+ */
+public class ColumnLocalDictionaryGenerator implements LocalDictionaryGenerator {
+
+  /**
+   * dictionary holder to hold dictionary values
+   */
+  private DictionaryStore dictionaryHolder;
+
+  public ColumnLocalDictionaryGenerator(int threshold) {
+    // adding 1 to threshold for null value
+    int newThreshold = threshold + 1;
+    this.dictionaryHolder = new MapBasedDictionaryStore(newThreshold);
+    // for handling null values
+    try {
+      dictionaryHolder.putIfAbsent(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+    } catch (DictionaryThresholdReachedException e) {
+      // do nothing
+    }
+  }
+
+  /**
+   * Below method will be used to generate dictionary
+   * @param data
+   * data for which dictionary needs to be generated
+   * @return dictionary value
+   */
+  @Override public int generateDictionary(byte[] data) throws DictionaryThresholdReachedException {
+    int dictionaryValue =  this.dictionaryHolder.putIfAbsent(data);
+    return dictionaryValue;
+  }
+
+  /**
+   * Below method will be used to check if threshold is reached
+   * for dictionary for particular column
+   * @return true if dictionary threshold reached for column
+   */
+  @Override public boolean isThresholdReached() {
+    return this.dictionaryHolder.isThresholdReached();
+  }
+
+  /**
+   * Below method will be used to get the dictionary key based on value
+   * @param value
+   * dictionary value
+   * @return dictionary key based on value
+   */
+  @Override public byte[] getDictionaryKeyBasedOnValue(int value) {
+    return this.dictionaryHolder.getDictionaryKeyBasedOnValue(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java
new file mode 100644
index 0000000..553c65b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary.generator;
+
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+
+/**
+ * Interface for generating dictionary for column
+ */
+public interface LocalDictionaryGenerator {
+
+  /**
+   * Below method will be used to generate dictionary
+   * @param data
+   * data for which dictionary needs to be generated
+   * @return dictionary value
+   */
+  int generateDictionary(byte[] data) throws DictionaryThresholdReachedException;
+
+  /**
+   * Below method will be used to check if threshold is reached
+   * for dictionary for particular column
+   * @return true if dictionary threshold reached for column
+   */
+  boolean isThresholdReached();
+
+  /**
+   * Below method will be used to get the dictionary key based on value
+   * @param value
+   * dictionary value
+   * @return dictionary key based on value
+   */
+  byte[] getDictionaryKeyBasedOnValue(int value);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 2cb19ea..68bd749 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -482,7 +482,7 @@ public class CarbonTable implements Serializable {
    * @return
    */
   public boolean isLocalDictionaryEnabled() {
-    return isLocalDictionaryEnabled;
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index af5121c..58de030 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -23,7 +23,9 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
+import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -44,6 +46,7 @@ import org.apache.carbondata.format.Encoding;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.format.FileHeader;
 import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.format.LocalDictionaryChunk;
 import org.apache.carbondata.format.SegmentInfo;
 
 /**
@@ -124,18 +127,39 @@ public class CarbonMetadataUtil {
     return numberOfRows;
   }
 
-  public static BlockletIndex getBlockletIndex(List<EncodedTablePage> encodedTablePageList,
+  private static EncodedColumnPage[] getEncodedColumnPages(EncodedBlocklet encodedBlocklet,
+      boolean isDimension, int pageIndex) {
+    int size =
+        isDimension ? encodedBlocklet.getNumberOfDimension() : encodedBlocklet.getNumberOfMeasure();
+    EncodedColumnPage [] encodedPages = new EncodedColumnPage[size];
+
+    for (int i = 0; i < size; i++) {
+      if (isDimension) {
+        encodedPages[i] =
+            encodedBlocklet.getEncodedDimensionColumnPages().get(i).getEncodedColumnPageList()
+                .get(pageIndex);
+      } else {
+        encodedPages[i] =
+            encodedBlocklet.getEncodedMeasureColumnPages().get(i).getEncodedColumnPageList()
+                .get(pageIndex);
+      }
+    }
+    return encodedPages;
+  }
+  public static BlockletIndex getBlockletIndex(EncodedBlocklet encodedBlocklet,
       List<CarbonMeasure> carbonMeasureList) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+
     // Calculating min/max for every each column.
-    TablePageStatistics stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(),
-        encodedTablePageList.get(0).getMeasures());
+    TablePageStatistics stats =
+        new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, 0),
+            getEncodedColumnPages(encodedBlocklet, false, 0));
     byte[][] minCol = stats.getDimensionMinValue().clone();
     byte[][] maxCol = stats.getDimensionMaxValue().clone();
 
-    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
-      stats = new TablePageStatistics(encodedTablePage.getDimensions(),
-          encodedTablePage.getMeasures());
+    for (int pageIndex = 0; pageIndex < encodedBlocklet.getNumberOfPages(); pageIndex++) {
+      stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, pageIndex),
+          getEncodedColumnPages(encodedBlocklet, false, pageIndex));
       byte[][] columnMaxData = stats.getDimensionMaxValue();
       byte[][] columnMinData = stats.getDimensionMinValue();
       for (int i = 0; i < maxCol.length; i++) {
@@ -155,16 +179,16 @@ public class CarbonMetadataUtil {
       blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
     }
 
-    stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(),
-        encodedTablePageList.get(0).getMeasures());
+    stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, 0),
+        getEncodedColumnPages(encodedBlocklet, false, 0));
     byte[][] measureMaxValue = stats.getMeasureMaxValue().clone();
     byte[][] measureMinValue = stats.getMeasureMinValue().clone();
     byte[] minVal = null;
     byte[] maxVal = null;
-    for (int i = 1; i < encodedTablePageList.size(); i++) {
+    for (int i = 1; i < encodedBlocklet.getNumberOfPages(); i++) {
       for (int j = 0; j < measureMinValue.length; j++) {
-        stats = new TablePageStatistics(
-            encodedTablePageList.get(i).getDimensions(), encodedTablePageList.get(i).getMeasures());
+        stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, i),
+            getEncodedColumnPages(encodedBlocklet, false, i));
         minVal = stats.getMeasureMinValue()[j];
         maxVal = stats.getMeasureMaxValue()[j];
         if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
@@ -185,10 +209,11 @@ public class CarbonMetadataUtil {
       blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
     }
     BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
-    byte[] startKey = encodedTablePageList.get(0).getPageKey().serializeStartKey();
+    byte[] startKey = encodedBlocklet.getPageMetadataList().get(0).serializeStartKey();
     blockletBTreeIndex.setStart_key(startKey);
-    byte[] endKey = encodedTablePageList.get(
-        encodedTablePageList.size() - 1).getPageKey().serializeEndKey();
+    byte[] endKey =
+        encodedBlocklet.getPageMetadataList().get(encodedBlocklet.getPageMetadataList().size() - 1)
+            .serializeEndKey();
     blockletBTreeIndex.setEnd_key(endKey);
     BlockletIndex blockletIndex = new BlockletIndex();
     blockletIndex.setMin_max_index(blockletMinMaxIndex);
@@ -300,7 +325,8 @@ public class CarbonMetadataUtil {
   /**
    * return DataChunk3 that contains the input DataChunk2 list
    */
-  public static DataChunk3 getDataChunk3(List<DataChunk2> dataChunksList) {
+  public static DataChunk3 getDataChunk3(List<DataChunk2> dataChunksList,
+      LocalDictionaryChunk encodedDictionary) {
     int offset = 0;
     DataChunk3 dataChunk = new DataChunk3();
     List<Integer> pageOffsets = new ArrayList<>();
@@ -313,6 +339,7 @@ public class CarbonMetadataUtil {
       pageLengths.add(length);
       offset += length;
     }
+    dataChunk.setLocal_dictionary(encodedDictionary);
     dataChunk.setData_chunk_list(dataChunksList);
     dataChunk.setPage_length(pageLengths);
     dataChunk.setPage_offset(pageOffsets);
@@ -323,26 +350,32 @@ public class CarbonMetadataUtil {
    * return DataChunk3 for the dimension column (specifed by `columnIndex`)
    * in `encodedTablePageList`
    */
-  public static DataChunk3 getDimensionDataChunk3(List<EncodedTablePage> encodedTablePageList,
-      int columnIndex) throws IOException {
-    List<DataChunk2> dataChunksList = new ArrayList<>(encodedTablePageList.size());
-    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
-      dataChunksList.add(encodedTablePage.getDimension(columnIndex).getPageMetadata());
+  public static DataChunk3 getDimensionDataChunk3(EncodedBlocklet encodedBlocklet,
+      int columnIndex) {
+    List<DataChunk2> dataChunksList = new ArrayList<>();
+    BlockletEncodedColumnPage blockletEncodedColumnPage =
+        encodedBlocklet.getEncodedDimensionColumnPages().get(columnIndex);
+    for (EncodedColumnPage encodedColumnPage : blockletEncodedColumnPage
+        .getEncodedColumnPageList()) {
+      dataChunksList.add(encodedColumnPage.getPageMetadata());
     }
-    return CarbonMetadataUtil.getDataChunk3(dataChunksList);
+    return CarbonMetadataUtil
+        .getDataChunk3(dataChunksList, blockletEncodedColumnPage.getEncodedDictionary());
   }
 
   /**
    * return DataChunk3 for the measure column (specifed by `columnIndex`)
    * in `encodedTablePageList`
    */
-  public static DataChunk3 getMeasureDataChunk3(List<EncodedTablePage> encodedTablePageList,
-      int columnIndex) throws IOException {
-    List<DataChunk2> dataChunksList = new ArrayList<>(encodedTablePageList.size());
-    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
-      dataChunksList.add(encodedTablePage.getMeasure(columnIndex).getPageMetadata());
+  public static DataChunk3 getMeasureDataChunk3(EncodedBlocklet encodedBlocklet, int columnIndex) {
+    List<DataChunk2> dataChunksList = new ArrayList<>();
+    BlockletEncodedColumnPage blockletEncodedColumnPage =
+        encodedBlocklet.getEncodedMeasureColumnPages().get(columnIndex);
+    for (EncodedColumnPage encodedColumnPage : blockletEncodedColumnPage
+        .getEncodedColumnPageList()) {
+      dataChunksList.add(encodedColumnPage.getPageMetadata());
     }
-    return CarbonMetadataUtil.getDataChunk3(dataChunksList);
+    return CarbonMetadataUtil.getDataChunk3(dataChunksList, null);
   }
 
   private static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index da31ea3..2909dc4 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -173,71 +173,6 @@ public class CarbonMetadataUtilTest {
     assertEquals(indexHeader, indexheaderResult);
   }
 
-  @Test public void testConvertFileFooter() throws Exception {
-    int[] cardinality = { 1, 2, 3, 4, 5 };
-
-    org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema =
-        new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema();
-    org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema1 =
-        new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema();
-    List<org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema>
-        columnSchemaList = new ArrayList<>();
-    columnSchemaList.add(colSchema);
-    columnSchemaList.add(colSchema1);
-
-    SegmentProperties segmentProperties = new SegmentProperties(columnSchemaList, cardinality);
-
-    final EncodedColumnPage measure = new EncodedColumnPage(new DataChunk2(), new byte[]{0,1},
-        PrimitivePageStatsCollector.newInstance(
-        org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE));
-    new MockUp<EncodedTablePage>() {
-      @SuppressWarnings("unused") @Mock
-      public EncodedColumnPage getMeasure(int measureIndex) {
-        return measure;
-      }
-    };
-
-    new MockUp<TablePageKey>() {
-      @SuppressWarnings("unused") @Mock
-      public byte[] serializeStartKey() {
-        return new byte[]{1, 2};
-      }
-
-      @SuppressWarnings("unused") @Mock
-      public byte[] serializeEndKey() {
-        return new byte[]{1, 2};
-      }
-    };
-
-    TablePageKey key = new TablePageKey(3, segmentProperties, false);
-    EncodedTablePage encodedTablePage = EncodedTablePage.newInstance(3, new EncodedColumnPage[0], new EncodedColumnPage[0],
-        key);
-
-    List<EncodedTablePage> encodedTablePageList = new ArrayList<>();
-    encodedTablePageList.add(encodedTablePage);
-
-    BlockletInfo3 blockletInfoColumnar1 = new BlockletInfo3();
-
-    List<BlockletInfo3> blockletInfoColumnarList = new ArrayList<>();
-    blockletInfoColumnarList.add(blockletInfoColumnar1);
-
-    byte[] byteMaxArr = "1".getBytes();
-    byte[] byteMinArr = "2".getBytes();
-
-    BlockletIndex index = getBlockletIndex(encodedTablePageList, segmentProperties.getMeasures());
-    List<BlockletIndex> indexList = new ArrayList<>();
-    indexList.add(index);
-
-    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
-    blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(byteMaxArr));
-    blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(byteMinArr));
-    FileFooter3 footer = convertFileFooterVersion3(blockletInfoColumnarList,
-        indexList,
-        cardinality, 2);
-    assertEquals(footer.getBlocklet_index_list(), indexList);
-
-  }
-
   @Test public void testGetBlockIndexInfo() throws Exception {
     byte[] startKey = { 1, 2, 3, 4, 5 };
     byte[] endKey = { 9, 3, 5, 5, 5 };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 1c15f3d..a495b6d 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -145,6 +145,7 @@ struct DataChunk3{
     1: required list<DataChunk2> data_chunk_list; // List of data chunk
     2: optional list<i32> page_offset; // Offset of each chunk
     3: optional list<i32> page_length; // Length of each chunk
+    4: optional LocalDictionaryChunk local_dictionary; // to store blocklet local dictionary values
    
  }
 /**
@@ -230,4 +231,15 @@ struct BlockletHeader{
 	3: optional BlockletIndex blocklet_index;  // Index for the following blocklet
 	4: required BlockletInfo blocklet_info;  // Info for the following blocklet
 	5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary
+}
+
+struct LocalDictionaryChunk {
+  1: required LocalDictionaryChunkMeta dictionary_meta
+	2: required binary dictionary_data; // the values in dictionary order, each value is represented in binary format
+	3: required binary dictionary_values; // surrogate keys used in the blocklet
+}
+
+struct LocalDictionaryChunkMeta {
+  1: required list<schema.Encoding> encoders; // The List of encoders overriden at node level
+  2: required list<binary> encoder_meta; // Extra information required by encoders
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 4ce80a6..da34746 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -65,10 +65,12 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
    */
   private int dataCounter;
 
-  private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children) {
+  private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children,
+      String name) {
     this.outputArrayIndex = outputArrayIndex;
     this.dataCounter = dataCounter;
     this.children = children;
+    this.name = name;
   }
 
 
@@ -108,7 +110,7 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
    * return column unique id
    */
   @Override
-  public String getColumnId() {
+  public String getColumnNames() {
     return columnId;
   }
 
@@ -285,7 +287,8 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
 
   @Override
   public GenericDataType<ArrayObject> deepCopy() {
-    return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy());
+    return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy(),
+        this.name);
   }
 
   @Override
@@ -293,4 +296,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
     type.add(ColumnType.COMPLEX_ARRAY);
     children.getChildrenType(type);
   }
+
+  @Override public void getColumnNames(List<String> columnNameList) {
+    columnNameList.add(name);
+    children.getColumnNames(columnNameList);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 8b1ccf2..049bf57 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -100,7 +100,7 @@ public interface GenericDataType<T> {
   /**
    * @return column uuid string
    */
-  String getColumnId();
+  String getColumnNames();
 
   /**
    * set array index to be referred while creating metadata column
@@ -159,4 +159,6 @@ public interface GenericDataType<T> {
 
   void getChildrenType(List<ColumnType> type);
 
+  void getColumnNames(List<String> columnNameList);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 3a477ce..5d22e55 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -235,7 +235,7 @@ public class PrimitiveDataType implements GenericDataType<Object> {
    * get column unique id
    */
   @Override
-  public String getColumnId() {
+  public String getColumnNames() {
     return columnId;
   }
 
@@ -536,11 +536,15 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     dataType.nullformat = this.nullformat;
     dataType.setKeySize(this.keySize);
     dataType.setSurrogateIndex(this.index);
-
+    dataType.name = this.name;
     return dataType;
   }
 
   public void getChildrenType(List<ColumnType> type) {
     type.add(ColumnType.COMPLEX_PRIMITIVE);
   }
+
+  @Override public void getColumnNames(List<String> columnNameList) {
+    columnNameList.add(name);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index b66eef7..4d3ba87 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -60,10 +60,12 @@ public class StructDataType implements GenericDataType<StructObject> {
    */
   private int dataCounter;
 
-  private StructDataType(List<GenericDataType> children, int outputArrayIndex, int dataCounter) {
+  private StructDataType(List<GenericDataType> children, int outputArrayIndex, int dataCounter,
+      String name) {
     this.children = children;
     this.outputArrayIndex = outputArrayIndex;
     this.dataCounter = dataCounter;
+    this.name = name;
   }
 
   /**
@@ -113,7 +115,7 @@ public class StructDataType implements GenericDataType<StructObject> {
    * get column unique id
    */
   @Override
-  public String getColumnId() {
+  public String getColumnNames() {
     return columnId;
   }
 
@@ -318,7 +320,7 @@ public class StructDataType implements GenericDataType<StructObject> {
     for (GenericDataType child : children) {
       childrenClone.add(child.deepCopy());
     }
-    return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter);
+    return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter, this.name);
   }
 
   public void getChildrenType(List<ColumnType> type) {
@@ -327,4 +329,11 @@ public class StructDataType implements GenericDataType<StructObject> {
       children.get(i).getChildrenType(type);
     }
   }
+
+  @Override public void getColumnNames(List<String> columnNameList) {
+    columnNameList.add(name);
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).getColumnNames(columnNameList);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 5fe3261..f3cb9c3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -49,7 +49,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
 /**
@@ -137,44 +136,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
-    SortScopeOptions.SortScope sortScope = model.getSortScope();
     this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
-
-    // in compaction flow the measure with decimal type will come as spark decimal.
-    // need to convert it to byte array.
-    if (model.isCompactionFlow()) {
-      try {
-        numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
-                CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      } catch (NumberFormatException exc) {
-        LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING
-            + "is wrong.Falling back to the default value "
-            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-        numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-      }
-    } else {
-      numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    }
-
-    if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-      numberOfCores = 1;
-    }
-    // Overriding it to the task specified cores.
-    if (model.getWritingCoresCount() > 0) {
-      numberOfCores = model.getWritingCoresCount();
-    }
-
+    this.numberOfCores = model.getNumberOfCores();
     blockletProcessingCount = new AtomicInteger(0);
-    producerExecutorService = Executors.newFixedThreadPool(numberOfCores,
-        new CarbonThreadFactory("ProducerPool:" + model.getTableName()
-            + ", range: " + model.getBucketId()));
+    producerExecutorService = Executors.newFixedThreadPool(model.getNumberOfCores(),
+        new CarbonThreadFactory(
+            "ProducerPool_" + System.nanoTime() + ":" + model.getTableName() + ", range: " + model
+                .getBucketId()));
     producerExecutorServiceTaskList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     LOGGER.info("Initializing writer executors");
-    consumerExecutorService = Executors
-        .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()
-            + ", range: " + model.getBucketId()));
+    consumerExecutorService = Executors.newFixedThreadPool(1, new CarbonThreadFactory(
+        "ConsumerPool_" + System.nanoTime() + ":" + model.getTableName() + ", range: " + model
+            .getBucketId()));
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
     tablePageList = new TablePageList();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 27249ab..5b12229 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -23,10 +23,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -35,6 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
@@ -50,6 +55,12 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 public class CarbonFactDataHandlerModel {
 
   /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonFactDataHandlerModel.class.getName());
+
+  /**
    * dbName
    */
   private String databaseName;
@@ -163,6 +174,10 @@ public class CarbonFactDataHandlerModel {
 
   private short writingCoresCount;
 
+  private Map<String, LocalDictionaryGenerator> columnLocalDictGenMap;
+
+  private int numberOfCores;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -272,7 +287,8 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
-
+    setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel);
+    setNumberOfCores(carbonFactDataHandlerModel);
     return carbonFactDataHandlerModel;
   }
 
@@ -340,8 +356,9 @@ public class CarbonFactDataHandlerModel {
             carbonFactDataHandlerModel.getTaskExtension(),
             String.valueOf(loadModel.getFactTimeStamp()),
             loadModel.getSegmentId()));
-
+    setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+    setNumberOfCores(carbonFactDataHandlerModel);
     return carbonFactDataHandlerModel;
   }
 
@@ -623,5 +640,86 @@ public class CarbonFactDataHandlerModel {
     return dataMapWriterlistener;
   }
 
+  public Map<String, LocalDictionaryGenerator> getColumnLocalDictGenMap() {
+    return columnLocalDictGenMap;
+  }
+
+  /**
+   * This method prepares a map which will have column and local dictionary generator mapping for
+   * all the local dictionary columns.
+   * @param carbonTable
+   * @param wrapperColumnSchema
+   * @param carbonFactDataHandlerModel
+   */
+  private static void setLocalDictToModel(CarbonTable carbonTable,
+      List<ColumnSchema> wrapperColumnSchema,
+      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+    boolean islocalDictEnabled = carbonTable.isLocalDictionaryEnabled();
+    // creates a map only if local dictionary is enabled, else map will be null
+    Map<String, LocalDictionaryGenerator> columnLocalDictGenMap = new HashMap<>();
+    if (islocalDictEnabled) {
+      int localDictionaryThreshold = carbonTable.getLocalDictionaryThreshold();
+      for (ColumnSchema columnSchema : wrapperColumnSchema) {
+        // check whether the column is local dictionary column or not
+        if (columnSchema.isLocalDictColumn()) {
+          columnLocalDictGenMap.put(columnSchema.getColumnName(),
+              new ColumnLocalDictionaryGenerator(localDictionaryThreshold));
+        }
+      }
+    }
+    if (islocalDictEnabled) {
+      LOGGER.info("Local dictionary is enabled for table: " + carbonTable.getTableUniqueName());
+      LOGGER.info(
+          "Local dictionary threshold for table: " + carbonTable.getTableUniqueName() + " is: "
+              + carbonTable.getLocalDictionaryThreshold());
+      Iterator<Map.Entry<String, LocalDictionaryGenerator>> iterator =
+          columnLocalDictGenMap.entrySet().iterator();
+      StringBuilder stringBuilder = new StringBuilder();
+      while (iterator.hasNext()) {
+        Map.Entry<String, LocalDictionaryGenerator> next = iterator.next();
+        stringBuilder.append(next.getKey());
+        stringBuilder.append(',');
+      }
+      LOGGER.info("Local dictionary will be generated for the columns:" + stringBuilder.toString()
+          + " for table: " + carbonTable.getTableUniqueName());
+    }
+    carbonFactDataHandlerModel.setColumnLocalDictGenMap(columnLocalDictGenMap);
+  }
+
+  public void setColumnLocalDictGenMap(
+      Map<String, LocalDictionaryGenerator> columnLocalDictGenMap) {
+    this.columnLocalDictGenMap = columnLocalDictGenMap;
+  }
+
+  private static void setNumberOfCores(CarbonFactDataHandlerModel model) {
+    // in compaction flow the measure with decimal type will come as spark decimal.
+    // need to convert it to byte array.
+    if (model.isCompactionFlow()) {
+      try {
+        model.numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
+                CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+      } catch (NumberFormatException exc) {
+        LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING
+            + "is wrong.Falling back to the default value "
+            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+        model.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+      }
+    } else {
+      model.numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    }
+
+    if (model.sortScope != null && model.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+      model.numberOfCores = 1;
+    }
+    // Overriding it to the task specified cores.
+    if (model.getWritingCoresCount() > 0) {
+      model.numberOfCores = model.getWritingCoresCount();
+    }
+  }
+
+  public int getNumberOfCores() {
+    return numberOfCores;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index b1b966b..c634a6d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCo
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -64,8 +65,8 @@ public class TablePage {
   // one vector to make it efficient for sorting
   private ColumnPage[] dictDimensionPages;
   private ColumnPage[] noDictDimensionPages;
-  private ComplexColumnPage[] complexDimensionPages;
   private ColumnPage[] measurePages;
+  private ComplexColumnPage[] complexDimensionPages;
 
   // the num of rows in this page, it must be less than short value (65536)
   private int pageSize;
@@ -104,19 +105,26 @@ public class TablePage {
         page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY));
         dictDimensionPages[tmpNumDictDimIdx++] = page;
       } else {
+        // will be encoded using string page
+        LocalDictionaryGenerator localDictionaryGenerator =
+            model.getColumnLocalDictGenMap().get(spec.getFieldName());
+        DataType dataType = DataTypes.STRING;
         if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
-          page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize);
+          dataType = DataTypes.VARCHAR;
+        }
+        if (null != localDictionaryGenerator) {
+          page = ColumnPage.newLocalDictPage(spec, dataType, pageSize, localDictionaryGenerator);
+        } else {
+          page = ColumnPage.newPage(spec, dataType, pageSize);
+        }
+        if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
         } else {
-          // In previous implementation, other data types such as string, date and timestamp
-          // will be encoded using string page
-          page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
           page.setStatsCollector(LVShortStringStatsCollector.newInstance());
         }
         noDictDimensionPages[tmpNumNoDictDimIdx++] = page;
       }
     }
-
     complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()];
     for (int i = 0; i < complexDimensionPages.length; i++) {
       // here we still do not the depth of the complex column, it will be initialized when
@@ -137,6 +145,7 @@ public class TablePage {
           PrimitivePageStatsCollector.newInstance(dataTypes[i]));
       measurePages[i] = page;
     }
+
     boolean hasNoDictionary = noDictDimensionPages.length > 0;
     this.key = new TablePageKey(pageSize, model.getSegmentProperties(), hasNoDictionary);
 
@@ -225,8 +234,16 @@ public class TablePage {
     // initialize the page if first row
     if (rowId == 0) {
       List<ColumnType> complexColumnType = new ArrayList<>();
+      List<String> columnNames = new ArrayList<>();
       complexDataType.getChildrenType(complexColumnType);
-      complexDimensionPages[index] = new ComplexColumnPage(pageSize, complexColumnType);
+      complexDataType.getColumnNames(columnNames);
+      complexDimensionPages[index] = new ComplexColumnPage(complexColumnType);
+      try {
+        complexDimensionPages[index]
+            .initialize(model.getColumnLocalDictGenMap(), columnNames, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     int depthInComplexColumn = complexDimensionPages[index].getDepth();
@@ -253,7 +270,7 @@ public class TablePage {
     }
 
     for (int depth = 0; depth < depthInComplexColumn; depth++) {
-      complexDimensionPages[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+      complexDimensionPages[index].putComplexData(depth, encodedComplexColumnar.get(depth));
     }
   }
 
@@ -267,6 +284,11 @@ public class TablePage {
     for (ColumnPage page : measurePages) {
       page.freeMemory();
     }
+    for (ComplexColumnPage page : complexDimensionPages) {
+      if (null != page) {
+        page.freeMemory();
+      }
+    }
   }
 
   // Adds length as a short element (first 2 bytes) to the head of the input byte array

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index b76722b..3082b91 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -149,6 +149,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    */
   private boolean enableDirectlyWriteData2Hdfs = false;
 
+  protected ExecutorService fallbackExecutorService;
+
   public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
     this.model = model;
     blockIndexInfoList = new ArrayList<>();
@@ -197,6 +199,14 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     blockletMetadata = new ArrayList<BlockletInfo3>();
     blockletIndex = new ArrayList<>();
     listener = this.model.getDataMapWriterlistener();
+    if (model.getColumnLocalDictGenMap().size() > 0) {
+      int numberOfCores = 1;
+      if (model.getNumberOfCores() > 1) {
+        numberOfCores = model.getNumberOfCores() / 2;
+      }
+      fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory(
+          "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId()));
+    }
   }
 
   /**
@@ -415,6 +425,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     } catch (InterruptedException | ExecutionException | IOException e) {
       throw new CarbonDataWriterException(e);
     }
+    if (null != fallbackExecutorService) {
+      fallbackExecutorService.shutdownNow();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
index 36fda3c..7607cf0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java
@@ -16,29 +16,34 @@
  */
 package org.apache.carbondata.processing.store.writer.v3;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.processing.store.TablePage;
 
 public class BlockletDataHolder {
-  private List<EncodedTablePage> encodedTablePage;
+
+  /**
+   * current data size
+   */
   private long currentSize;
 
-  public BlockletDataHolder() {
-    this.encodedTablePage = new ArrayList<>();
+  private EncodedBlocklet encodedBlocklet;
+
+  public BlockletDataHolder(ExecutorService fallbackpool) {
+    encodedBlocklet = new EncodedBlocklet(fallbackpool);
   }
 
   public void clear() {
-    encodedTablePage.clear();
     currentSize = 0;
+    encodedBlocklet.clear();
   }
 
   public void addPage(TablePage rawTablePage) {
     EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage();
-    this.encodedTablePage.add(encodedTablePage);
     currentSize += encodedTablePage.getEncodedSize();
+    encodedBlocklet.addEncodedTablePage(encodedTablePage);
   }
 
   public long getSize() {
@@ -47,19 +52,14 @@ public class BlockletDataHolder {
   }
 
   public int getNumberOfPagesAdded() {
-    return encodedTablePage.size();
+    return encodedBlocklet.getNumberOfPages();
   }
 
   public int getTotalRows() {
-    int rows = 0;
-    for (EncodedTablePage nh : encodedTablePage) {
-      rows += nh.getPageSize();
-    }
-    return rows;
+    return encodedBlocklet.getBlockletSize();
   }
 
-  public List<EncodedTablePage> getEncodedTablePages() {
-    return encodedTablePage;
+  public EncodedBlocklet getEncodedBlocklet() {
+    return encodedBlocklet;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index d1deef1..e562f26 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -25,8 +25,9 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
+import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
@@ -76,7 +77,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       blockletSizeThreshold = fileSizeInBytes;
       LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
     }
-    blockletDataHolder = new BlockletDataHolder();
+    blockletDataHolder = new BlockletDataHolder(fallbackExecutorService);
   }
 
   @Override protected void writeBlockletInfoToFile()
@@ -110,14 +111,15 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
    */
   @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException,IOException {
+
     // condition for writting all the pages
     if (!tablePage.isLastPage()) {
       boolean isAdded = false;
       // check if size more than blocklet size then write the page to file
-      if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() >=
-          blockletSizeThreshold) {
+      if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize()
+          >= blockletSizeThreshold) {
         // if blocklet size exceeds threshold, write blocklet data
-        if (blockletDataHolder.getEncodedTablePages().size() == 0) {
+        if (blockletDataHolder.getNumberOfPagesAdded() == 0) {
           isAdded = true;
           addPageData(tablePage);
         }
@@ -164,12 +166,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
    */
   private void writeBlockletToFile() {
     // get the list of all encoded table page
-    List<EncodedTablePage> encodedTablePageList = blockletDataHolder.getEncodedTablePages();
-    int numDimensions = encodedTablePageList.get(0).getNumDimensions();
-    int numMeasures = encodedTablePageList.get(0).getNumMeasures();
+    EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet();
+    int numDimensions = encodedBlocklet.getNumberOfDimension();
+    int numMeasures = encodedBlocklet.getNumberOfMeasure();
+
     // get data chunks for all the column
     byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
-    long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes);
+    long metadataSize = fillDataChunk(encodedBlocklet, dataChunkBytes);
     // calculate the total size of data to be written
     long blockletSize = blockletDataHolder.getSize() + metadataSize;
     // to check if data size will exceed the block size then create a new file
@@ -199,27 +202,22 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
   /**
    * Fill dataChunkBytes and return total size of page metadata
    */
-  private long fillDataChunk(List<EncodedTablePage> encodedTablePageList, byte[][] dataChunkBytes) {
+  private long fillDataChunk(EncodedBlocklet encodedBlocklet, byte[][] dataChunkBytes) {
     int size = 0;
-    int numDimensions = encodedTablePageList.get(0).getNumDimensions();
-    int numMeasures = encodedTablePageList.get(0).getNumMeasures();
+    int numDimensions = encodedBlocklet.getNumberOfDimension();
+    int numMeasures = encodedBlocklet.getNumberOfMeasure();
     int measureStartIndex = numDimensions;
     // calculate the size of data chunks
-    try {
-      for (int i = 0; i < numDimensions; i++) {
-        dataChunkBytes[i] = CarbonUtil.getByteArray(
-            CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i));
-        size += dataChunkBytes[i].length;
-      }
-      for (int i = 0; i < numMeasures; i++) {
-        dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(
-            CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i));
-        size += dataChunkBytes[measureStartIndex].length;
-        measureStartIndex++;
-      }
-    } catch (IOException e) {
-      LOGGER.error(e, "Problem while getting the data chunks");
-      throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+    for (int i = 0; i < numDimensions; i++) {
+      dataChunkBytes[i] =
+          CarbonUtil.getByteArray(CarbonMetadataUtil.getDimensionDataChunk3(encodedBlocklet, i));
+      size += dataChunkBytes[i].length;
+    }
+    for (int i = 0; i < numMeasures; i++) {
+      dataChunkBytes[measureStartIndex] =
+          CarbonUtil.getByteArray(CarbonMetadataUtil.getMeasureDataChunk3(encodedBlocklet, i));
+      size += dataChunkBytes[measureStartIndex].length;
+      measureStartIndex++;
     }
     return size;
   }
@@ -250,33 +248,30 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
     List<Integer> currentDataChunksLength = new ArrayList<>();
-    List<EncodedTablePage> encodedTablePages = blockletDataHolder.getEncodedTablePages();
-    int numberOfDimension = encodedTablePages.get(0).getNumDimensions();
-    int numberOfMeasures = encodedTablePages.get(0).getNumMeasures();
+    EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet();
+    int numberOfDimension = encodedBlocklet.getNumberOfDimension();
+    int numberOfMeasures = encodedBlocklet.getNumberOfMeasure();
     ByteBuffer buffer = null;
     long dimensionOffset = 0;
     long measureOffset = 0;
-    int numberOfRows = 0;
-    // calculate the number of rows in each blocklet
-    for (EncodedTablePage encodedTablePage : encodedTablePages) {
-      numberOfRows += encodedTablePage.getPageSize();
-    }
     for (int i = 0; i < numberOfDimension; i++) {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[i].length);
       buffer = ByteBuffer.wrap(dataChunkBytes[i]);
       currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[i].length;
-      for (EncodedTablePage encodedTablePage : encodedTablePages) {
-        EncodedColumnPage dimension = encodedTablePage.getDimension(i);
-        buffer = dimension.getEncodedData();
+      BlockletEncodedColumnPage blockletEncodedColumnPage =
+          encodedBlocklet.getEncodedDimensionColumnPages().get(i);
+      for (EncodedColumnPage dimensionPage : blockletEncodedColumnPage
+          .getEncodedColumnPageList()) {
+        buffer = dimensionPage.getEncodedData();
         int bufferSize = buffer.limit();
         currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
       }
     }
     dimensionOffset = offset;
-    int dataChunkStartIndex = encodedTablePages.get(0).getNumDimensions();
+    int dataChunkStartIndex = encodedBlocklet.getNumberOfDimension();
     for (int i = 0; i < numberOfMeasures; i++) {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
@@ -284,9 +279,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[dataChunkStartIndex].length;
       dataChunkStartIndex++;
-      for (EncodedTablePage encodedTablePage : encodedTablePages) {
-        EncodedColumnPage measure = encodedTablePage.getMeasure(i);
-        buffer = measure.getEncodedData();
+      BlockletEncodedColumnPage blockletEncodedColumnPage =
+          encodedBlocklet.getEncodedMeasureColumnPages().get(i);
+      for (EncodedColumnPage measurePage : blockletEncodedColumnPage
+          .getEncodedColumnPageList()) {
+        buffer = measurePage.getEncodedData();
         int bufferSize = buffer.limit();
         currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
@@ -295,10 +292,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     measureOffset = offset;
     blockletIndex.add(
         CarbonMetadataUtil.getBlockletIndex(
-            encodedTablePages, model.getSegmentProperties().getMeasures()));
+            encodedBlocklet, model.getSegmentProperties().getMeasures()));
     BlockletInfo3 blockletInfo3 =
-        new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
-            dimensionOffset, measureOffset, blockletDataHolder.getEncodedTablePages().size());
+        new BlockletInfo3(encodedBlocklet.getBlockletSize(), currentDataChunksOffset,
+            currentDataChunksLength, dimensionOffset, measureOffset,
+            encodedBlocklet.getNumberOfPages());
     blockletMetadata.add(blockletInfo3);
   }
 


[2/2] carbondata git commit: [CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support

Posted by ra...@apache.org.
[CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support

What changes are proposed in this PR

Added code to support Local Dictionary Data Loading for primitive type
Added code to support Local Dictionary Data Loading for complex type.
How this PR is tested
Manual testing is done in 3 Node setup.
UT will be raised in different PR

This closes #2402


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e7103397
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e7103397
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e7103397

Branch: refs/heads/master
Commit: e7103397d96623955ad4f55fc1d0cac7c679a8d7
Parents: 5804d75
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon Jun 4 15:41:50 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Jun 27 15:03:31 2018 +0530

----------------------------------------------------------------------
 .../dictionary/DictionaryByteArrayWrapper.java  |   4 +
 .../carbondata/core/datastore/ColumnType.java   |   6 +-
 .../blocklet/BlockletEncodedColumnPage.java     | 176 +++++++++++
 .../datastore/blocklet/EncodedBlocklet.java     | 179 +++++++++++
 ...mpressedDimensionChunkFileBasedReaderV3.java |   3 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |  12 +-
 .../core/datastore/page/ColumnPage.java         |  71 +++--
 .../core/datastore/page/ComplexColumnPage.java  | 149 ++++++---
 .../page/FallbackColumnPageEncoder.java         |  84 +++++
 .../page/FallbackEncodedColumnPage.java         |  49 +++
 .../datastore/page/LocalDictColumnPage.java     | 316 +++++++++++++++++++
 .../datastore/page/SafeVarLengthColumnPage.java |  21 +-
 .../datastore/page/UnsafeDecimalColumnPage.java |  12 +-
 .../page/UnsafeVarLengthColumnPage.java         |  16 +-
 .../datastore/page/VarLengthColumnPageBase.java |  38 +--
 .../page/encoding/ColumnPageEncoder.java        |  40 ++-
 .../page/encoding/EncodedColumnPage.java        |  31 +-
 .../legacy/DictDimensionIndexCodec.java         |   2 +-
 .../legacy/DirectDictDimensionIndexCodec.java   |   2 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |   8 +-
 .../page/statistics/DummyStatsCollector.java    |  88 ++++++
 .../localdictionary/PageLevelDictionary.java    | 126 ++++++++
 .../dictionaryholder/DictionaryStore.java       |  50 +++
 .../MapBasedDictionaryStore.java                | 137 ++++++++
 .../DictionaryThresholdReachedException.java    |  87 +++++
 .../ColumnLocalDictionaryGenerator.java         |  75 +++++
 .../generator/LocalDictionaryGenerator.java     |  48 +++
 .../core/metadata/schema/table/CarbonTable.java |   2 +-
 .../core/util/CarbonMetadataUtil.java           |  89 ++++--
 .../core/util/CarbonMetadataUtilTest.java       |  65 ----
 format/src/main/thrift/carbondata.thrift        |  12 +
 .../processing/datatypes/ArrayDataType.java     |  15 +-
 .../processing/datatypes/GenericDataType.java   |   4 +-
 .../processing/datatypes/PrimitiveDataType.java |   8 +-
 .../processing/datatypes/StructDataType.java    |  15 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  42 +--
 .../store/CarbonFactDataHandlerModel.java       | 102 +++++-
 .../carbondata/processing/store/TablePage.java  |  38 ++-
 .../store/writer/AbstractFactDataWriter.java    |  13 +
 .../store/writer/v3/BlockletDataHolder.java     |  32 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  88 +++---
 41 files changed, 2001 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
index 03c86ac..f812f86 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java
@@ -89,4 +89,8 @@ public class DictionaryByteArrayWrapper {
     result = 31 * result;
     return result;
   }
+
+  public byte[] getData() {
+    return data;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
index 8bbf12d..080444c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
@@ -37,7 +37,9 @@ public enum ColumnType {
 
   COMPLEX_ARRAY,
 
-  COMPLEX_PRIMITIVE;
+  COMPLEX_PRIMITIVE,
+
+  PLAIN_LONG_VALUE;
 
   public static ColumnType valueOf(int ordinal) {
     if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
@@ -56,6 +58,8 @@ public enum ColumnType {
       return COMPLEX_ARRAY;
     } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) {
       return COMPLEX_PRIMITIVE;
+    } else if (ordinal == PLAIN_LONG_VALUE.ordinal()) {
+      return PLAIN_LONG_VALUE;
     } else {
       throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
new file mode 100644
index 0000000..6508787
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.blocklet;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+
+/**
+ * Maintains the list of encoded page of a column in a blocklet
+ * and encoded dictionary values only if column is encoded using local
+ * dictionary
+ * Handle the fallback if all the pages in blocklet are not
+ * encoded with local dictionary
+ */
+public class BlockletEncodedColumnPage {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName());
+
+  /**
+   * list of encoded page of a column in a blocklet
+   */
+  private List<EncodedColumnPage> encodedColumnPageList;
+
+  /**
+   * fallback executor service
+   */
+  private ExecutorService fallbackExecutorService;
+
+  /**
+   * to check whether pages are local dictionary encoded or not
+   */
+  private boolean isLocalDictEncoded;
+
+  /**
+   * page level dictionary only when column is encoded with local dictionary
+   */
+  private PageLevelDictionary pageLevelDictionary;
+
+  /**
+   * fallback future task queue;
+   */
+  private ArrayDeque<Future<FallbackEncodedColumnPage>> fallbackFutureQueue;
+
+  BlockletEncodedColumnPage(ExecutorService fallbackExecutorService) {
+    this.fallbackExecutorService = fallbackExecutorService;
+  }
+
+  /**
+   * Below method will be used to add column page of a column
+   *
+   * @param encodedColumnPage
+   * encoded column page
+   */
+  void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) {
+    if (null == encodedColumnPageList) {
+      this.encodedColumnPageList = new ArrayList<>();
+      // if dimension page is local dictionary enabled and encoded with local dictionary
+      if (encodedColumnPage.isLocalDictGeneratedPage()) {
+        this.isLocalDictEncoded = true;
+        // get first page dictionary
+        this.pageLevelDictionary = encodedColumnPage.getPageDictionary();
+      }
+      encodedColumnPageList.add(encodedColumnPage);
+      return;
+    }
+    // if local dictionary is false or column is encoded with local dictionary then
+    // add a page
+    if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) {
+      this.encodedColumnPageList.add(encodedColumnPage);
+      // merge page level dictionary values
+      if (null != this.pageLevelDictionary) {
+        pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary());
+      }
+    } else {
+      // if older pages were encoded with dictionary and new pages are without dictionary
+      isLocalDictEncoded = false;
+      pageLevelDictionary = null;
+      this.fallbackFutureQueue = new ArrayDeque<>();
+      LOGGER.info(
+          "Local dictionary Fallback is initiated for column: " + encodedColumnPageList.get(0)
+              .getActualPage().getColumnSpec().getFieldName());
+      // submit all the older pages encoded with dictionary for fallback
+      for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) {
+        fallbackFutureQueue.add(fallbackExecutorService.submit(
+            new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex)));
+      }
+      //add to page list
+      this.encodedColumnPageList.add(encodedColumnPage);
+    }
+  }
+
+  /**
+   * Return the list of encoded page list for a column in a blocklet
+   *
+   * @return list of encoded page list
+   */
+  public List<EncodedColumnPage> getEncodedColumnPageList() {
+    // if fallback queue is null then for some pages fallback was initiated
+    if (null != this.fallbackFutureQueue) {
+      try {
+        // check if queue is not empty
+        while (!fallbackFutureQueue.isEmpty()) {
+          // get the head element of queue
+          FallbackEncodedColumnPage fallbackEncodedColumnPage = fallbackFutureQueue.poll().get();
+          // add the encoded column page to list
+          encodedColumnPageList.set(fallbackEncodedColumnPage.getPageIndex(),
+              fallbackEncodedColumnPage.getEncodedColumnPage());
+          fallbackFutureQueue.poll();
+        }
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException("Problem while encoding the blocklet data during fallback", e);
+      }
+      // setting to null as all the fallback encoded page has been added to list
+      fallbackFutureQueue = null;
+    }
+    // in case of dictionary encoded column page memory will be freed only after
+    // all the pages are added in a blocklet, as fallback can happen anytime so old pages memory
+    // cannot be freed, so after encoding is done we can free the page memory
+    if (null != pageLevelDictionary) {
+      // clear the memory footprint for local dictionary encoded pages
+      for (EncodedColumnPage columnPage : encodedColumnPageList) {
+        columnPage.freeMemory();
+      }
+    }
+    return encodedColumnPageList;
+  }
+
+  /**
+   * Below method will be used to get the encoded dictionary
+   * values for local dictionary generated columns
+   *
+   * @return encoded dictionary values if column is local dictionary generated
+   */
+  public LocalDictionaryChunk getEncodedDictionary() {
+    if (null != pageLevelDictionary) {
+      try {
+        return pageLevelDictionary.getLocalDictionaryChunkForBlocklet();
+      } catch (IOException | MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
new file mode 100644
index 0000000..794c439
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.blocklet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+
+/**
+ * Holds the blocklet level data and metadata to be written in carbondata file
+ * For dimension pages it will check if all the pages are not encoded with dictionary
+ * then it will encode those pages for that column again
+ */
+public class EncodedBlocklet {
+
+  /**
+   * number of rows in a blocklet
+   */
+  private int blockletSize;
+
+  /**
+   * list of page metadata
+   */
+  private List<TablePageKey> pageMetadataList;
+
+  /**
+   * maintains encoded dimension data for each column
+   */
+  private List<BlockletEncodedColumnPage> encodedDimensionColumnPages;
+
+  /**
+   * maintains encoded measure data for each column
+   */
+  private List<BlockletEncodedColumnPage> encodedMeasureColumnPages;
+
+  /**
+   * fallback executor service, will used to re-encode column pages
+   */
+  private ExecutorService executorService;
+
+  /**
+   * number of pages in a blocklet
+   */
+  private int numberOfPages;
+
+  public EncodedBlocklet(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * Below method will be used to add page metadata details
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addPageMetadata(EncodedTablePage encodedTablePage) {
+    // for first table page create new list
+    if (null == pageMetadataList) {
+      pageMetadataList = new ArrayList<>();
+    }
+    // update details
+    blockletSize += encodedTablePage.getPageSize();
+    pageMetadataList.add(encodedTablePage.getPageKey());
+    this.numberOfPages++;
+  }
+
+  /**
+   * Below method will be used to add measure column pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addEncodedMeasurePage(EncodedTablePage encodedTablePage) {
+    // for first page create new list
+    if (null == encodedMeasureColumnPages) {
+      encodedMeasureColumnPages = new ArrayList<>();
+      // adding measure pages
+      for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
+        BlockletEncodedColumnPage blockletEncodedColumnPage = new BlockletEncodedColumnPage(null);
+        blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
+        encodedMeasureColumnPages.add(blockletEncodedColumnPage);
+      }
+    } else {
+      for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) {
+        encodedMeasureColumnPages.get(i).addEncodedColumnColumnPage(encodedTablePage.getMeasure(i));
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to add dimension column pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  private void addEncodedDimensionPage(EncodedTablePage encodedTablePage) {
+    // for first page create new list
+    if (null == encodedDimensionColumnPages) {
+      encodedDimensionColumnPages = new ArrayList<>();
+      // adding measure pages
+      for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
+        BlockletEncodedColumnPage blockletEncodedColumnPage =
+            new BlockletEncodedColumnPage(executorService);
+        blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
+        encodedDimensionColumnPages.add(blockletEncodedColumnPage);
+      }
+    } else {
+      for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) {
+        encodedDimensionColumnPages.get(i)
+            .addEncodedColumnColumnPage(encodedTablePage.getDimension(i));
+      }
+    }
+  }
+
+  /**
+   * Use to add table pages
+   *
+   * @param encodedTablePage
+   * encoded table page
+   */
+  public void addEncodedTablePage(EncodedTablePage encodedTablePage) {
+    addPageMetadata(encodedTablePage);
+    addEncodedDimensionPage(encodedTablePage);
+    addEncodedMeasurePage(encodedTablePage);
+  }
+
+  public int getBlockletSize() {
+    return blockletSize;
+  }
+
+  public List<TablePageKey> getPageMetadataList() {
+    return pageMetadataList;
+  }
+
+  public List<BlockletEncodedColumnPage> getEncodedDimensionColumnPages() {
+    return encodedDimensionColumnPages;
+  }
+
+  public List<BlockletEncodedColumnPage> getEncodedMeasureColumnPages() {
+    return encodedMeasureColumnPages;
+  }
+
+  public int getNumberOfDimension() {
+    return encodedDimensionColumnPages.size();
+  }
+
+  public int getNumberOfMeasure() {
+    return encodedMeasureColumnPages.size();
+  }
+
+  public int getNumberOfPages() {
+    return this.numberOfPages;
+  }
+
+  public void clear() {
+    this.numberOfPages = 0;
+    this.encodedDimensionColumnPages = null;
+    this.blockletSize = 0;
+    this.encodedMeasureColumnPages = null;
+    this.pageMetadataList = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 782a8df..fee114d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -242,7 +242,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
   }
 
   private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) {
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
+      MemoryException {
     byte[] dataPage;
     int[] rlePage;
     int[] invertedIndexes = new int[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 911a260..99a7e57 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -37,11 +37,15 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   private byte[] max;
 
   public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage,
-      boolean isNoDictonary) {
+      boolean isNoDictonary, boolean isVarchar) {
     this.dataPage = dataPage;
     min = this.dataPage[0];
     max = this.dataPage[0];
     totalSize += this.dataPage[0].length;
+    int lVFormatLength = 2;
+    if (isVarchar) {
+      lVFormatLength = 4;
+    }
     int minCompare = 0;
     int maxCompare = 0;
     if (!isNoDictonary) {
@@ -60,9 +64,11 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
       for (int i = 1; i < this.dataPage.length; i++) {
         totalSize += this.dataPage[i].length;
         minCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(min, 2, min.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2);
+            .compareTo(min, lVFormatLength, min.length - lVFormatLength, this.dataPage[i],
+                lVFormatLength, this.dataPage[i].length - lVFormatLength);
         maxCompare = ByteUtil.UnsafeComparer.INSTANCE
-            .compareTo(max, 2, max.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2);
+            .compareTo(max, lVFormatLength, max.length - lVFormatLength, this.dataPage[i],
+                lVFormatLength, this.dataPage[i].length - lVFormatLength);
         if (minCompare > 0) {
           min = this.dataPage[i];
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 4dcf514..4ff1330 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -59,7 +61,7 @@ public abstract class ColumnPage {
   private BitSet nullBitSet;
 
   // statistics collector for this column page
-  private ColumnPageStatsCollector statsCollector;
+  protected ColumnPageStatsCollector statsCollector;
 
   protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
@@ -79,32 +81,8 @@ public abstract class ColumnPage {
     return dataType;
   }
 
-  private static final SimpleStatsResult statsForComplexType = new SimpleStatsResult() {
-    @Override public Object getMin() {
-      return new byte[0];
-    }
-
-    @Override public Object getMax() {
-      return new byte[0];
-    }
-
-    @Override public int getDecimalCount() {
-      return 0;
-    }
-
-    @Override public DataType getDataType() {
-      return BYTE_ARRAY;
-    }
-
-  };
-
   public SimpleStatsResult getStatistics() {
-    if (statsCollector != null) {
-      return statsCollector.getPageStats();
-    } else {
-      // TODO: for sub column of complex type, there no stats yet, return a dummy result
-      return statsForComplexType;
-    }
+    return statsCollector.getPageStats();
   }
 
   public int getPageSize() {
@@ -184,6 +162,19 @@ public abstract class ColumnPage {
     return newPage(columnSpec, dataType, pageSize);
   }
 
+  public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize, LocalDictionaryGenerator localDictionaryGenerator) throws MemoryException {
+    if (unsafe) {
+      return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize),
+          new UnsafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
+          localDictionaryGenerator);
+    } else {
+      return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize),
+          new SafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize),
+          localDictionaryGenerator);
+    }
+  }
+
   /**
    * Create a new page of dataType and number of row = pageSize
    */
@@ -675,6 +666,9 @@ public abstract class ColumnPage {
    */
   public abstract void convertValue(ColumnPageValueConverter codec);
 
+  public PageLevelDictionary getPageDictionary() {
+    throw new UnsupportedOperationException("Operation Not Supported");
+  }
   /**
    * Compress page data using specified compressor
    */
@@ -702,7 +696,9 @@ public abstract class ColumnPage {
       return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY && (
         columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT
-            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) {
+            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY
+            || columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE
+            || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       return compressor.compressByte(getComplexParentFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
@@ -742,8 +738,9 @@ public abstract class ColumnPage {
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
       return newDoublePage(columnSpec, doubleData);
-    } else if (storeDataType == DataTypes.BYTE_ARRAY
-        && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+    } else if (storeDataType == DataTypes.BYTE_ARRAY && (
+        columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
+            || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newComplexLVBytesPage(columnSpec, lvVarBytes,
           CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
@@ -756,6 +753,10 @@ public abstract class ColumnPage {
         && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
     } else if (storeDataType == DataTypes.BYTE_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
       return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
@@ -816,4 +817,16 @@ public abstract class ColumnPage {
   public TableSpec.ColumnSpec getColumnSpec() {
     return columnSpec;
   }
+
+  public boolean isLocalDictGeneratedPage() {
+    return false;
+  }
+
+  public void disableLocalDictEncoding() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  public PageLevelDictionary getColumnPageDictionary() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 07dc837..c6b650f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -17,74 +17,129 @@
 
 package org.apache.carbondata.core.datastore.page;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.ColumnType;
-
-// Represent a complex column page, e.g. Array, Struct type column
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+/**
+ * holds the complex columndata and its children data
+ */
 public class ComplexColumnPage {
 
-  // Holds data for all rows in this page in columnar layout.
-  // After the complex data expand, it is of type byte[][], the first level array in the byte[][]
-  // representing a sub-column in the complex type, which can be retrieved by giving the depth
-  // of the complex type.
-  // TODO: further optimize it to make it more memory efficient
-  private List<ArrayList<byte[]>> complexColumnData;
-
-  // depth is the number of column after complex type is expanded. It is from 1 to N
-  private final int pageSize;
-
+  /**
+   * number of columns
+   */
   private int depth;
 
+  /**
+   * type of each column
+   */
   private List<ColumnType> complexColumnType;
 
-  public ComplexColumnPage(int pageSize, List<ColumnType> complexColumnType) {
-    this.pageSize = pageSize;
+  /**
+   * column page for each type
+   */
+  private ColumnPage[] columnPages;
+
+  /**
+   * to maintain the number of record added for each type
+   */
+  private int[] currentRowIdList;
+
+  public ComplexColumnPage(List<ColumnType> complexColumnType) {
     this.depth = complexColumnType.size();
-    complexColumnData = new ArrayList<>(depth);
-    for (int i = 0; i < depth; i++) {
-      complexColumnData.add(new ArrayList<byte[]>());
-    }
     this.complexColumnType = complexColumnType;
+    this.columnPages = new ColumnPage[this.depth];
+    this.currentRowIdList = new int[depth];
   }
 
-  public void putComplexData(int rowId, int depth, List<byte[]> value) {
-    assert (depth <= this.depth);
-    ArrayList<byte[]> subColumnPage = complexColumnData.get(depth);
-    subColumnPage.addAll(value);
-  }
-
-  // iterate on the sub-column after complex type is expanded, return columnar page of
-  // each sub-column
-  public Iterator<byte[][]> iterator() {
-
-    return new CarbonIterator<byte[][]>() {
-      private int index = 0;
-      @Override public boolean hasNext() {
-        return index < depth;
-      }
-
-      @Override public byte[][] next() {
-        // convert the subColumnPage from ArrayList<byte[]> to byte[][]
-        ArrayList<byte[]> subColumnPage = complexColumnData.get(index);
-        index++;
-        return subColumnPage.toArray(new byte[subColumnPage.size()][]);
+  /**
+   * below method will be used to initlize the column page of complex type
+   * @param columnToDictMap
+   * dictionary map
+   * @param columnNames
+   * list of columns
+   * @param pageSize
+   * number of records
+   * @throws MemoryException
+   * if memory is not sufficient
+   */
+  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap,
+      List<String> columnNames, int pageSize) throws MemoryException {
+    for (int i = 0; i < this.columnPages.length; i++) {
+      LocalDictionaryGenerator localDictionaryGenerator = columnToDictMap.get(columnNames.get(i));
+      if (null == localDictionaryGenerator) {
+        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
+        this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+        this.columnPages[i].setStatsCollector(new DummyStatsCollector());
+      } else {
+        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
+        this.columnPages[i] = ColumnPage
+            .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator);
+        this.columnPages[i].setStatsCollector(new DummyStatsCollector());
       }
-    };
+    }
   }
 
+  /**
+   *
+   * @return depth
+   */
   public int getDepth() {
     return depth;
   }
 
-  public int getPageSize() {
-    return pageSize;
-  }
-
+  /**
+   * return the type of complex column
+   * @param isDepth
+   * @return co plex column type
+   */
   public ColumnType getComplexColumnType(int isDepth) {
     return complexColumnType.get(isDepth);
   }
+
+  /**
+   * method to add complex column data
+   * @param depth
+   * depth of column
+   * @param dataList
+   * dataList
+   */
+  public void putComplexData(int depth, List<byte[]> dataList) {
+    assert (depth <= this.depth);
+    int currentNumber = currentRowIdList[depth];
+    for (int i = 0; i < dataList.size(); i++) {
+      columnPages[depth].putData(currentNumber, dataList.get(i));
+      currentNumber++;
+    }
+    currentRowIdList[depth] = currentNumber;
+  }
+
+  /**
+   * to free the used memory
+   */
+  public void freeMemory() {
+    for (int i = 0; i < depth; i++) {
+      columnPages[i].freeMemory();
+    }
+  }
+
+  /**
+   * return the column page
+   * @param depth
+   * depth of column
+   * @return colum page
+   */
+  public ColumnPage getColumnPage(int depth) {
+    assert (depth <= this.depth);
+    return columnPages[depth];
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
new file mode 100644
index 0000000..32846a1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+
+/**
+ * Below class will be used to encode column pages for which local dictionary was generated
+ * but all the pages in blocklet was not encoded with local dictionary.
+ * This is required as all the pages of a column in blocklet either it will be local dictionary
+ * encoded or without local dictionary encoded.
+ */
+public class FallbackColumnPageEncoder implements Callable<FallbackEncodedColumnPage> {
+
+  /**
+   * actual local dictionary generated column page
+   */
+  private EncodedColumnPage encodedColumnPage;
+
+  /**
+   * actual index in the page
+   * this is required as in a blocklet few pages will be local dictionary
+   * encoded and few pages will be plain text encoding
+   * in this case local dictionary encoded page
+   */
+  private int pageIndex;
+
+  public FallbackColumnPageEncoder(EncodedColumnPage encodedColumnPage, int pageIndex) {
+    this.encodedColumnPage = encodedColumnPage;
+    this.pageIndex = pageIndex;
+  }
+
+  @Override public FallbackEncodedColumnPage call() throws Exception {
+    // disable encoding using local dictionary
+    encodedColumnPage.getActualPage().disableLocalDictEncoding();
+    // new encoded column page
+    EncodedColumnPage newEncodedColumnPage;
+
+    // get column spec for existing column page
+    TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
+    switch (columnSpec.getColumnType()) {
+      case COMPLEX_ARRAY:
+      case COMPLEX_PRIMITIVE:
+      case COMPLEX_STRUCT:
+      case COMPLEX:
+        // for complex type column
+        newEncodedColumnPage = ColumnPageEncoder.encodedColumn(
+            encodedColumnPage.getActualPage());
+        break;
+      default:
+        // for primitive column
+        ColumnPageEncoder columnPageEncoder = DefaultEncodingFactory.getInstance()
+            .createEncoder(encodedColumnPage.getActualPage().getColumnSpec(),
+                encodedColumnPage.getActualPage());
+        newEncodedColumnPage = columnPageEncoder.encode(encodedColumnPage.getActualPage());
+    }
+    FallbackEncodedColumnPage fallbackEncodedColumnPage =
+        new FallbackEncodedColumnPage(newEncodedColumnPage, pageIndex);
+    // here freeing the memory of raw column page as fallback is done and column page will not
+    // be used.
+    // This is required to free the memory once it is of no use
+    encodedColumnPage.freeMemory();
+    return fallbackEncodedColumnPage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
new file mode 100644
index 0000000..9ce87b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+
+/**
+ * Maintains the fallback encoded page and metadata
+ */
+public class FallbackEncodedColumnPage {
+
+  /**
+   * encode page
+   */
+  private EncodedColumnPage encodedColumnPage;
+
+  /**
+   * page index in a blocklet
+   */
+  private int pageIndex;
+
+  public FallbackEncodedColumnPage(EncodedColumnPage encodedColumnPage, int pageIndex) {
+    this.encodedColumnPage = encodedColumnPage;
+    this.pageIndex = pageIndex;
+  }
+
+  public EncodedColumnPage getEncodedColumnPage() {
+    return encodedColumnPage;
+  }
+
+  public int getPageIndex() {
+    return pageIndex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
new file mode 100644
index 0000000..2c7d3a7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Column page implementation for Local dictionary generated columns
+ * Its a decorator over two column page
+ * 1. Which will hold the actual data
+ * 2. Which will hold the dictionary encoded data
+ */
+public class LocalDictColumnPage extends ColumnPage {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalDictColumnPage.class.getName());
+
+  /**
+   * to maintain page level dictionary for column page
+   */
+  private PageLevelDictionary pageLevelDictionary;
+
+  /**
+   * to hold the actual data of the column
+   */
+  private ColumnPage actualDataColumnPage;
+
+  /**
+   * to hold the dictionary encoded column page
+   */
+  private ColumnPage encodedDataColumnPage;
+
+  /**
+   * to check if actual column page memory is already clear
+   */
+  private boolean isActualPageMemoryFreed;
+
+  /**
+   * Create a new column page with input data type and page size.
+   */
+  protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage,
+      LocalDictionaryGenerator localDictionaryGenerator) {
+    super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(),
+        actualDataColumnPage.getPageSize());
+    // if threshold is not reached then create page level dictionary
+    // for encoding with local dictionary
+    if (!localDictionaryGenerator.isThresholdReached()) {
+      pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator,
+          actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType());
+      this.encodedDataColumnPage = encodedColumnpage;
+    } else {
+      // else free the encoded column page memory as its of no use
+      encodedColumnpage.freeMemory();
+    }
+    this.actualDataColumnPage = actualDataColumnPage;
+  }
+
+  @Override public byte[][] getByteArrayPage() {
+    if (null != pageLevelDictionary) {
+      return encodedDataColumnPage.getByteArrayPage();
+    } else {
+      return actualDataColumnPage.getByteArrayPage();
+    }
+  }
+
+  /**
+   * Below method will be used to check whether page is local dictionary
+   * generated or not. This will be used for while enoding the the page
+   *
+   * @return
+   */
+  public boolean isLocalDictGeneratedPage() {
+    return null != pageLevelDictionary;
+  }
+
+  /**
+   * Below method will be used to add column data to page
+   *
+   * @param rowId row number
+   * @param bytes actual data
+   */
+  @Override public void putBytes(int rowId, byte[] bytes) {
+    if (null != pageLevelDictionary) {
+      try {
+        actualDataColumnPage.putBytes(rowId, bytes);
+        int dictionaryValue = pageLevelDictionary.getDictionaryValue(bytes);
+        encodedDataColumnPage.putBytes(rowId, ByteUtil.toBytes(dictionaryValue));
+      } catch (DictionaryThresholdReachedException e) {
+        LOGGER.error(e, "Local Dictionary threshold reached for the column: " + actualDataColumnPage
+            .getColumnSpec().getFieldName());
+        pageLevelDictionary = null;
+        encodedDataColumnPage.freeMemory();
+        encodedDataColumnPage = null;
+      }
+    } else {
+      actualDataColumnPage.putBytes(rowId, bytes);
+    }
+  }
+
+  @Override public void disableLocalDictEncoding() {
+    pageLevelDictionary = null;
+    freeEncodedColumnPage();
+  }
+
+  @Override public PageLevelDictionary getColumnPageDictionary() {
+    return pageLevelDictionary;
+  }
+
+  @Override public void setBytePage(byte[] byteData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setShortPage(short[] shortData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setShortIntPage(byte[] shortIntData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setIntPage(int[] intData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setLongPage(long[] longData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void setByteArrayPage(byte[][] byteArray) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void freeMemory() {
+    if (null == pageLevelDictionary) {
+      actualDataColumnPage.freeMemory();
+      isActualPageMemoryFreed = true;
+    }
+  }
+
+  public void freeMemoryForce() {
+    if (!isActualPageMemoryFreed) {
+      actualDataColumnPage.freeMemory();
+      isActualPageMemoryFreed = true;
+    }
+    freeEncodedColumnPage();
+  }
+
+  private void freeEncodedColumnPage() {
+    if (null != encodedDataColumnPage) {
+      encodedDataColumnPage.freeMemory();
+      encodedDataColumnPage = null;
+    }
+  }
+
+  @Override public void putByte(int rowId, byte value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal decimal) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putShortInt(int rowId, int value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte getByte(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public short getShort(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int getShortInt(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int getInt(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public long getLong(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public float getFloat(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public double getDouble(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public BigDecimal getDecimal(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getBytes(int rowId) {
+    return actualDataColumnPage.getBytes(rowId);
+  }
+
+  @Override public byte[] getBytePage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public short[] getShortPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getShortIntPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public int[] getIntPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public long[] getLongPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public float[] getFloatPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public double[] getDoublePage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public byte[] getLVFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getLVFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getLVFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getComplexParentFlattenedBytePage() throws IOException {
+    if (null != encodedDataColumnPage) {
+      return encodedDataColumnPage.getComplexParentFlattenedBytePage();
+    } else {
+      return actualDataColumnPage.getComplexParentFlattenedBytePage();
+    }
+  }
+
+  @Override public byte[] getDecimalPage() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override public void convertValue(ColumnPageValueConverter codec) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 7b1ad20..c2eb40c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -21,6 +21,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -28,11 +30,11 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   // for string and decimal data
-  private byte[][] byteArrayData;
+  private List<byte[]> byteArrayData;
 
   SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
-    byteArrayData = new byte[pageSize][];
+    byteArrayData = new ArrayList<>();
   }
 
   @Override
@@ -42,13 +44,12 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public void putBytesAtRow(int rowId, byte[] bytes) {
-    byteArrayData[rowId] = bytes;
+    byteArrayData.add(bytes);
   }
 
   @Override
   public void putBytes(int rowId, byte[] bytes, int offset, int length) {
-    byteArrayData[rowId] = new byte[length];
-    System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+    byteArrayData.add(bytes);
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
@@ -62,12 +63,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[] getBytes(int rowId) {
-    return byteArrayData[rowId];
+    return byteArrayData.get(rowId);
   }
 
   @Override
   public void setByteArrayPage(byte[][] byteArray) {
-    byteArrayData = byteArray;
+    for (byte[] data : byteArray) {
+      byteArrayData.add(data);
+    }
   }
 
   @Override
@@ -104,12 +107,12 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[][] getByteArrayPage() {
-    return byteArrayData;
+    return byteArrayData.toArray(new byte[byteArrayData.size()][]);
   }
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+    System.arraycopy(byteArrayData.get(rowId), 0, dest, destOffset, length);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 1cdefc8..7449da6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -168,7 +168,7 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
       throw new RuntimeException(e);
     }
     CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, baseAddress,
-        baseOffset + rowOffset[rowId], length);
+        baseOffset + rowOffset.get(rowId), length);
   }
 
   @Override
@@ -193,9 +193,9 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
   @Override
   public byte[] getBytes(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
     byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
     return bytes;
   }
@@ -242,9 +242,9 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
     } else if (dataType == DataTypes.LONG) {
       value = getLong(rowId);
     } else {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes,
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), bytes,
           CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
       return decimalConverter.getDecimal(bytes);
     }
@@ -253,7 +253,7 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], dest,
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), dest,
         CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 9d6e161..f60e505 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -65,7 +65,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
       throw new RuntimeException(e);
     }
     CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
-        baseAddress, baseOffset + rowOffset[rowId], length);
+        baseAddress, baseOffset + rowOffset.get(rowId), length);
   }
 
   @Override
@@ -89,20 +89,20 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   public byte[] getBytes(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
     byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
     return bytes;
   }
 
   @Override
   public byte[][] getByteArrayPage() {
-    byte[][] bytes = new byte[pageSize][];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[][] bytes = new byte[rowOffset.size() - 1][];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       byte[] rowData = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
           rowData, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
       bytes[rowId] = rowData;
     }
@@ -111,7 +111,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
   @Override
   void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId),
         dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index cb907a5..bd49b94 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -53,7 +53,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   Object baseAddress;
 
   // the offset of row in the unsafe memory, its size is pageSize + 1
-  int[] rowOffset;
+  List<Integer> rowOffset;
 
   // the length of bytes added in the page
   int totalLength;
@@ -66,7 +66,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
-    rowOffset = new int[pageSize + 1];
+    rowOffset = new ArrayList<>();
     totalLength = 0;
   }
 
@@ -160,9 +160,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
-    page.rowOffset = new int[rowId + 1];
-    for (int i = 0; i < rowId + 1; i++) {
-      page.rowOffset[i] = rowOffset.get(i);
+    page.rowOffset = new ArrayList<>();
+    for (int i = 0; i < rowOffset.size(); i++) {
+      page.rowOffset.add(rowOffset.get(i));
     }
     for (int i = 0; i < rowId; i++) {
       page.putBytes(i, lvEncodedBytes, i * size, size);
@@ -240,9 +240,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     // set total length and rowOffset in page
     page.totalLength = offset;
-    page.rowOffset = new int[rowId + 1];
-    for (int i = 0; i < rowId + 1; i++) {
-      page.rowOffset[i] = rowOffset.get(i);
+    page.rowOffset = new ArrayList<>();
+    for (int i = 0; i < rowOffset.size(); i++) {
+      page.rowOffset.add(rowOffset.get(i));
     }
 
     // set data in page
@@ -296,9 +296,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
           + " exceed this limit at rowId " + rowId);
     }
     if (rowId == 0) {
-      rowOffset[0] = 0;
+      rowOffset.add(0);
     }
-    rowOffset[rowId + 1] = rowOffset[rowId] + bytes.length;
+    rowOffset.add(rowOffset.get(rowId) + bytes.length);
     putBytesAtRow(rowId, bytes);
     totalLength += bytes.length;
   }
@@ -379,7 +379,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     int offset = 0;
     byte[] data = new byte[totalLength];
     for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       copyBytes(rowId, data, offset, length);
       offset += length;
     }
@@ -395,9 +395,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   public byte[] getLVFlattenedBytePage() throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + pageSize * 4];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 4)];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId);
       ByteUtil.setInt(data, offset, length);
       copyBytes(rowId, data, offset + 4, length);
       offset += 4 + length;
@@ -408,9 +408,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + pageSize * 2];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+    byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 2)];
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId));
       ByteUtil.setShort(data, offset, length);
       copyBytes(rowId, data, offset + 2, length);
       offset += 2 + length;
@@ -423,8 +423,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     // output LV encoded byte array
     int offset = 0;
     byte[] data = new byte[totalLength];
-    for (int rowId = 0; rowId < pageSize; rowId++) {
-      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+    for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) {
+      short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId));
       copyBytes(rowId, data, offset, length);
       offset += length;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 8bff5cc..f53024a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -22,11 +22,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.ColumnType;
-import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -39,6 +36,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+import org.apache.carbondata.format.LocalDictionaryChunkMeta;
 import org.apache.carbondata.format.PresenceMeta;
 
 public abstract class ColumnPageEncoder {
@@ -56,7 +55,7 @@ public abstract class ColumnPageEncoder {
   public EncodedColumnPage encode(ColumnPage inputPage) throws IOException, MemoryException {
     byte[] encodedBytes = encodeData(inputPage);
     DataChunk2 pageMetadata = buildPageMetadata(inputPage, encodedBytes);
-    return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage.getStatistics());
+    return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage);
   }
 
   private DataChunk2 buildPageMetadata(ColumnPage inputPage, byte[] encodedBytes)
@@ -138,22 +137,39 @@ public abstract class ColumnPageEncoder {
       throws IOException, MemoryException {
     EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()];
     int index = 0;
-    Iterator<byte[][]> iterator = input.iterator();
-    while (iterator.hasNext()) {
-      byte[][] subColumnPage = iterator.next();
-      encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index));
+    while (index < input.getDepth()) {
+      ColumnPage subColumnPage = input.getColumnPage(index);
+      encodedPages[index] = encodedColumn(subColumnPage);
       index++;
     }
     return encodedPages;
   }
 
-  private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType)
+  public static EncodedColumnPage encodedColumn(ColumnPage page)
       throws IOException, MemoryException {
-    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-        .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType);
-    ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
     ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
     return encoder.encode(page);
   }
 
+  /**
+   * Below method to encode the dictionary page
+   * @param dictionaryPage
+   * dictionary column page
+   * @return local dictionary chunk
+   * @throws IOException
+   * Problem in encoding
+   * @throws MemoryException
+   * problem in encoding
+   */
+  public LocalDictionaryChunk encodeDictionary(ColumnPage dictionaryPage)
+      throws IOException, MemoryException {
+    LocalDictionaryChunk localDictionaryChunk = new LocalDictionaryChunk();
+    localDictionaryChunk.setDictionary_data(encodeData(dictionaryPage));
+    LocalDictionaryChunkMeta localDictionaryChunkMeta = new LocalDictionaryChunkMeta();
+    localDictionaryChunkMeta.setEncoders(getEncodingList());
+    localDictionaryChunkMeta.setEncoder_meta(buildEncoderMeta(dictionaryPage));
+    localDictionaryChunk.setDictionary_meta(localDictionaryChunkMeta);
+    return localDictionaryChunk;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
index 43d6fc6..6f78d95 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.nio.ByteBuffer;
 
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LocalDictColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 
@@ -34,8 +37,7 @@ public class EncodedColumnPage {
   // metadata of this page
   private DataChunk2 pageMetadata;
 
-  // stats of this page
-  private SimpleStatsResult stats;
+  private ColumnPage actualPage;
 
   /**
    * Constructor
@@ -43,7 +45,7 @@ public class EncodedColumnPage {
    * @param encodedData encoded data for this page
    */
   public EncodedColumnPage(DataChunk2 pageMetadata, byte[] encodedData,
-      SimpleStatsResult stats) {
+      ColumnPage actualPage) {
     if (pageMetadata == null) {
       throw new IllegalArgumentException("data chunk2 must not be null");
     }
@@ -52,7 +54,7 @@ public class EncodedColumnPage {
     }
     this.pageMetadata = pageMetadata;
     this.encodedData = encodedData;
-    this.stats = stats;
+    this.actualPage = actualPage;
   }
 
   /**
@@ -76,6 +78,25 @@ public class EncodedColumnPage {
   }
 
   public SimpleStatsResult getStats() {
-    return stats;
+    return actualPage.getStatistics();
+  }
+
+  public ColumnPage getActualPage() {
+    return actualPage;
+  }
+
+  public boolean isLocalDictGeneratedPage() {
+    return actualPage.isLocalDictGeneratedPage();
+  }
+
+  public PageLevelDictionary getPageDictionary() {
+    return actualPage.getColumnPageDictionary();
+  }
+
+  public void freeMemory() {
+    if (actualPage instanceof LocalDictColumnPage) {
+      LocalDictColumnPage page = (LocalDictColumnPage) actualPage;
+      page.freeMemoryForce();
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
index d157654..5694817 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -51,7 +51,7 @@ public class DictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
index 1e5015b..17a523c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -52,7 +52,7 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
         if (isInvertedIndex) {
           indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index 741dbfe..c68f394 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -55,10 +55,12 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
       protected void encodeIndexStorage(ColumnPage input) {
         IndexStorage indexStorage;
         byte[][] data = input.getByteArrayPage();
+        boolean isDictionary = input.isLocalDictGeneratedPage();
         if (isInvertedIndex) {
-          indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
+          indexStorage = new BlockIndexerStorageForShort(data, isDictionary, !isDictionary, isSort);
         } else {
-          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+          indexStorage =
+              new BlockIndexerStorageForNoInvertedIndexForShort(data, !isDictionary, false);
         }
         byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
         super.compressedDataPage = compressor.compressByte(flattened);
@@ -75,8 +77,6 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
         }
         return encodings;
       }
-
     };
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
new file mode 100644
index 0000000..a8bc5f1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
+
+/**
+ * Column Page dummy stats collector. This will be used for which stats generation
+ * is not required for example complex type column
+ */
+public class DummyStatsCollector implements ColumnPageStatsCollector {
+
+  /**
+   * dummy stats used to sync with encoder
+   */
+  protected static final SimpleStatsResult DUMMY_STATS = new SimpleStatsResult() {
+    @Override public Object getMin() {
+      return new byte[0];
+    }
+
+    @Override public Object getMax() {
+      return new byte[0];
+    }
+
+    @Override public int getDecimalCount() {
+      return 0;
+    }
+
+    @Override public DataType getDataType() {
+      return BYTE_ARRAY;
+    }
+
+  };
+
+  @Override public void updateNull(int rowId) {
+
+  }
+
+  @Override public void update(byte value) {
+
+  }
+
+  @Override public void update(short value) {
+
+  }
+
+  @Override public void update(int value) {
+
+  }
+
+  @Override public void update(long value) {
+
+  }
+
+  @Override public void update(double value) {
+
+  }
+
+  @Override public void update(BigDecimal value) {
+
+  }
+
+  @Override public void update(byte[] value) {
+
+  }
+
+  @Override public SimpleStatsResult getPageStats() {
+    return DUMMY_STATS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7103397/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
new file mode 100644
index 0000000..3ea36ef
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.localdictionary;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
+import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
+import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+
+/**
+ * Class to maintain page level dictionary. It will store all unique dictionary values
+ * used in a page. This is required while writing blocklet level dictionary in carbondata
+ * file
+ */
+public class PageLevelDictionary {
+
+  /**
+   * dictionary generator to generate dictionary values for page data
+   */
+  private LocalDictionaryGenerator localDictionaryGenerator;
+
+  /**
+   * set of dictionary surrogate key in this page
+   */
+  private BitSet usedDictionaryValues;
+
+  private String columnName;
+
+  private DataType dataType;
+
+  public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName,
+      DataType dataType) {
+    this.localDictionaryGenerator = localDictionaryGenerator;
+    this.usedDictionaryValues = new BitSet();
+    this.columnName = columnName;
+    this.dataType = dataType;
+  }
+
+  /**
+   * Below method will be used to get the dictionary value
+   *
+   * @param data column data
+   * @return dictionary value
+   * @throws DictionaryThresholdReachedException when threshold crossed for column
+   */
+  public int getDictionaryValue(byte[] data) throws DictionaryThresholdReachedException {
+    int dictionaryValue = localDictionaryGenerator.generateDictionary(data);
+    this.usedDictionaryValues.set(dictionaryValue);
+    return dictionaryValue;
+  }
+
+  /**
+   * Method to merge the dictionary value across pages
+   *
+   * @param pageLevelDictionary other page level dictionary
+   */
+  public void mergerDictionaryValues(PageLevelDictionary pageLevelDictionary) {
+    usedDictionaryValues.and(pageLevelDictionary.usedDictionaryValues);
+  }
+
+  /**
+   * Below method will be used to get the local dictionary chunk for writing
+   * @TODO Support for numeric data type dictionary exclude columns
+   * @return encoded local dictionary chunk
+   * @throws MemoryException
+   * in case of problem in encoding
+   * @throws IOException
+   * in case of problem in encoding
+   */
+  public LocalDictionaryChunk getLocalDictionaryChunkForBlocklet()
+      throws MemoryException, IOException {
+    // TODO support for actual data type dictionary ColumnSPEC
+    ColumnType columnType = ColumnType.PLAIN_VALUE;
+    if (DataTypes.VARCHAR == dataType) {
+      columnType = ColumnType.PLAIN_LONG_VALUE;
+    }
+    TableSpec.ColumnSpec spec =
+        TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType);
+    ColumnPage dictionaryColumnPage =
+        ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, usedDictionaryValues.cardinality());
+    // TODO support data type specific stats collector for numeric data types
+    dictionaryColumnPage.setStatsCollector(new DummyStatsCollector());
+    int rowId = 0;
+    for (int i = usedDictionaryValues.nextSetBit(0);
+         i >= 0; i = usedDictionaryValues.nextSetBit(i + 1)) {
+      dictionaryColumnPage
+          .putData(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(i));
+    }
+    // creating a encoder
+    ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
+    // get encoded dictionary values
+    LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage);
+    // set compressed dictionary values
+    localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor()
+        .compressByte(usedDictionaryValues.toByteArray()));
+    // free the dictionary page memory
+    dictionaryColumnPage.freeMemory();
+    return localDictionaryChunk;
+  }
+}