You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/28 06:10:06 UTC

[6/7] carbondata git commit: [CARBONDATA-1268] Support encoding strategy for dimension columns

[CARBONDATA-1268] Support encoding strategy for dimension columns

In this PR, dimension encoding is changed to use EncodingStrategy instead of hard coding.
In future, dimension encoding can be adjusted by extending EncodingStrategy

This closes#1136


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a5af0ff2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a5af0ff2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a5af0ff2

Branch: refs/heads/master
Commit: a5af0ff238230bf64c8ac987bec9977d3f081ff2
Parents: bc3e684
Author: jackylk <ja...@huawei.com>
Authored: Thu Jul 13 09:21:30 2017 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Fri Jul 28 01:06:03 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/datastore/TableSpec.java    | 275 +++++++------------
 ...CompressedMeasureChunkFileBasedReaderV1.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |   2 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |   2 +-
 .../core/datastore/page/ColumnPage.java         |   3 +
 .../core/datastore/page/ComplexColumnPage.java  |   4 +-
 .../page/encoding/AdaptiveCompressionCodec.java |   6 +
 .../page/encoding/ColumnPageCodec.java          |   7 +
 .../encoding/ComplexDimensionIndexCodec.java    |  74 +++++
 .../page/encoding/DefaultEncodingStrategy.java  |  37 ++-
 .../page/encoding/DictDimensionIndexCodec.java  |  65 +++++
 .../page/encoding/DirectCompressCodec.java      |   6 +
 .../encoding/DirectDictDimensionIndexCodec.java |  66 +++++
 .../page/encoding/EncodingStrategy.java         |   8 +-
 .../HighCardDictDimensionIndexCodec.java        |  66 +++++
 .../page/encoding/IndexStorageCodec.java        |  48 ++++
 .../schema/table/column/CarbonDimension.java    |   4 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   6 +-
 .../carbondata/processing/store/TablePage.java  | 218 ++++-----------
 .../util/CarbonDataProcessorUtil.java           |  20 ++
 21 files changed, 576 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 87c4934..f1d3546 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -25,216 +25,145 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 
 public class TableSpec {
 
-  // contains name and type for each dimension
-  private DimensionSpec dimensionSpec;
-  // contains name and type for each measure
-  private MeasureSpec measureSpec;
+  // column spec for each dimension and measure
+  private DimensionSpec[] dimensionSpec;
+  private MeasureSpec[] measureSpec;
 
-  public TableSpec(List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
-    dimensionSpec = new DimensionSpec(dimensions);
-    measureSpec = new MeasureSpec(measures);
-  }
-
-  public DimensionSpec getDimensionSpec() {
-    return dimensionSpec;
-  }
-
-  public MeasureSpec getMeasureSpec() {
-    return measureSpec;
-  }
+  // number of simple dimensions
+  private int numSimpleDimensions;
 
-  public class DimensionSpec {
-
-    // field name of each dimension, in schema order
-    private String[] fieldName;
-
-    // encoding type of each dimension, in schema order
-    private DimensionType[] types;
-
-    // number of simple dimensions
-    private int numSimpleDimensions;
-
-    // number of complex dimensions
-    private int numComplexDimensions;
-
-    // number of dimensions after complex column expansion
-    private int numDimensionExpanded;
-
-    DimensionSpec(List<CarbonDimension> dimensions) {
-      // first calculate total number of columnar field considering column group and complex column
-      numDimensionExpanded = 0;
-      numSimpleDimensions = 0;
-      numComplexDimensions = 0;
-      boolean inColumnGroup = false;
-      for (CarbonDimension dimension : dimensions) {
-        if (dimension.isColumnar()) {
-          if (inColumnGroup) {
-            inColumnGroup = false;
-          }
-          if (dimension.isComplex()) {
-            numDimensionExpanded += dimension.getNumDimensionsExpanded();
-            numComplexDimensions++;
-          } else {
-            numDimensionExpanded++;
-            numSimpleDimensions++;
-          }
-        } else {
-          // column group
-          if (!inColumnGroup) {
-            inColumnGroup = true;
-            numDimensionExpanded++;
-            numSimpleDimensions++;
-          }
+  public TableSpec(List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+    // first calculate total number of columnar field considering column group and complex column
+    numSimpleDimensions = 0;
+    for (CarbonDimension dimension : dimensions) {
+      if (dimension.isColumnar()) {
+        if (!dimension.isComplex()) {
+          numSimpleDimensions++;
         }
+      } else {
+        throw new UnsupportedOperationException("column group is not supported");
       }
+    }
+    dimensionSpec = new DimensionSpec[dimensions.size()];
+    measureSpec = new MeasureSpec[measures.size()];
+    addDimensions(dimensions);
+    addMeasures(measures);
+  }
 
-      // then extract dimension name and type for each column
-      fieldName = new String[numDimensionExpanded];
-      types = new DimensionType[numDimensionExpanded];
-      inColumnGroup = false;
-      int index = 0;
-      for (CarbonDimension dimension : dimensions) {
-        if (dimension.isColumnar()) {
-          if (inColumnGroup) {
-            inColumnGroup = false;
-          }
-          if (dimension.isComplex()) {
-            int count = addDimension(index, dimension);
-            index += count;
-          } else if (dimension.getDataType() == DataType.TIMESTAMP ||
-                     dimension.getDataType() == DataType.DATE) {
-            addSimpleDimension(index++, dimension.getColName(), DimensionType.DIRECT_DICTIONARY);
-          } else if (dimension.isGlobalDictionaryEncoding()) {
-            addSimpleDimension(index++, dimension.getColName(), DimensionType.GLOBAL_DICTIONARY);
-          } else {
-            addSimpleDimension(index++, dimension.getColName(), DimensionType.PLAIN_VALUE);
-          }
+  private void addDimensions(List<CarbonDimension> dimensions) {
+    int dimIndex = 0;
+    for (int i = 0; i < dimensions.size(); i++) {
+      CarbonDimension dimension = dimensions.get(i);
+      if (dimension.isColumnar()) {
+        if (dimension.isComplex()) {
+          DimensionSpec spec = new DimensionSpec(DimensionType.COMPLEX, dimension);
+          dimensionSpec[dimIndex++] = spec;
+        } else if (dimension.isDirectDictionaryEncoding()) {
+          DimensionSpec spec = new DimensionSpec(DimensionType.DIRECT_DICTIONARY, dimension);
+          dimensionSpec[dimIndex++] = spec;
+        } else if (dimension.isGlobalDictionaryEncoding()) {
+          DimensionSpec spec = new DimensionSpec(DimensionType.GLOBAL_DICTIONARY, dimension);
+          dimensionSpec[dimIndex++] = spec;
         } else {
-          // column group
-          if (!inColumnGroup) {
-            addSimpleDimension(index++, dimension.getColName(), DimensionType.COLUMN_GROUP);
-            inColumnGroup = true;
-          }
+          DimensionSpec spec = new DimensionSpec(DimensionType.PLAIN_VALUE, dimension);
+          dimensionSpec[dimIndex++] = spec;
         }
       }
     }
+  }
 
-    private void addSimpleDimension(int index, String name, DimensionType type) {
-      fieldName[index] = name;
-      types[index] = type;
+  private void addMeasures(List<CarbonMeasure> measures) {
+    for (int i = 0; i < measures.size(); i++) {
+      CarbonMeasure measure = measures.get(i);
+      measureSpec[i] = new MeasureSpec(measure.getColName(), measure.getDataType());
     }
+  }
 
-    // add dimension and return number of columns added
-    private int addDimension(int index, CarbonDimension dimension) {
-      switch (dimension.getDataType()) {
-        case ARRAY:
-          addSimpleDimension(index, dimension.getColName() + ".offset", DimensionType.COMPLEX);
-          List<CarbonDimension> arrayChildren = dimension.getListOfChildDimensions();
-          int count = 1;
-          for (CarbonDimension child : arrayChildren) {
-            count += addDimension(index + count, child);
-          }
-          return count;
-        case STRUCT:
-          addSimpleDimension(index, dimension.getColName() + ".empty", DimensionType.COMPLEX);
-          List<CarbonDimension> structChildren = dimension.getListOfChildDimensions();
-          count = 1;
-          for (CarbonDimension child : structChildren) {
-            count += addDimension(index + count, child);
-          }
-          return count;
-        case TIMESTAMP:
-        case DATE:
-          addSimpleDimension(index, dimension.getColName(), DimensionType.DIRECT_DICTIONARY);
-          return 1;
-        default:
-          addSimpleDimension(index, dimension.getColName(),
-              dimension.isGlobalDictionaryEncoding() ?
-                  DimensionType.GLOBAL_DICTIONARY : DimensionType.PLAIN_VALUE);
-          return 1;
-      }
-    }
+  public DimensionSpec getDimensionSpec(int dimensionIndex) {
+    return dimensionSpec[dimensionIndex];
+  }
 
+  public MeasureSpec getMeasureSpec(int measureIndex) {
+    return measureSpec[measureIndex];
+  }
 
-    /**
-     * return the dimension type of index'th dimension. index is from 0 to numDimensions
-     */
-    public DimensionType getType(int index) {
-      assert (index >= 0 && index < types.length);
-      return types[index];
+  public int getNumSimpleDimensions() {
+    return numSimpleDimensions;
+  }
+
+  public int getNumDimensions() {
+    return dimensionSpec.length;
+  }
+
+    public int getScale(int index) {
+      assert (index >= 0 && index < precision.length);
+      return scale[index];
     }
 
-    /**
-     * return number of dimensions
+    public int getPrecision(int index) {
+      assert (index >= 0 && index < precision.length);
+      return precision[index];
+    }/**
+     * return number of measures
      */
-    public int getNumSimpleDimensions() {
-      return numSimpleDimensions;
-    }
+    public int getNumMeasures() {
+      return measureSpec.length;
+    }public class ColumnSpec {
+    // field name of this column
+    private String fieldName;
+
+    // data type of this column
+    private DataType dataType;
 
-    public int getNumComplexDimensions() {
-      return numComplexDimensions;
+    ColumnSpec(String fieldName, DataType dataType) {
+      this.fieldName = fieldName;
+      this.dataType = dataType;
     }
 
-    public int getNumExpandedDimensions() {
-      return numDimensionExpanded;
+    public DataType getDataType() {
+      return dataType;
     }
 
+    public String getFieldName() {
+      return fieldName;
+    }
   }
 
-  public class MeasureSpec {
+  public class DimensionSpec extends ColumnSpec {
 
-    // field name of each measure, in schema order
-    private String[] fieldName;
+    // dimension type of this dimension
+    private DimensionType type;
 
-    // data type of each measure, in schema order
-    private DataType[] types;
+    // indicate whether this dimension is in sort column
+    private boolean inSortColumns;
 
-    private int[] scale;
+    // indicate whether this dimension need to do inverted index
+    private boolean doInvertedIndex;
 
-    private int[] precision;
-
-    MeasureSpec(List<CarbonMeasure> measures) {
-      fieldName = new String[measures.size()];
-      types = new DataType[measures.size()];
-      scale = new int[measures.size()];
-      precision = new int[measures.size()];
-      int i = 0;
-      for (CarbonMeasure measure: measures) {
-        add(i++, measure.getColName(), measure.getDataType(), measure.getScale(),
-            measure.getPrecision());
-      }
+    DimensionSpec(DimensionType dimensionType, CarbonDimension dimension) {
+      super(dimension.getColName(), dimension.getDataType());
+      this.type = dimensionType;
+      this.inSortColumns = dimension.isSortColumn();
+      this.doInvertedIndex = dimension.isUseInvertedIndex();
     }
 
-    private void add(int index, String name, DataType type, int scale, int precision) {
-      fieldName[index] = name;
-      types[index] = type;
-      this.scale[index] = scale;
-      this.precision[index] = precision;
+    public DimensionType getDimensionType() {
+      return type;
     }
 
-    /**
-     * return the data type of index'th measure. index is from 0 to numMeasures
-     */
-    public DataType getType(int index) {
-      assert (index >= 0 && index < types.length);
-      return types[index];
+    public boolean isInSortColumns() {
+      return inSortColumns;
     }
 
-    public int getScale(int index) {
-      assert (index >= 0 && index < precision.length);
-      return scale[index];
+    public boolean isDoInvertedIndex() {
+      return doInvertedIndex;
     }
+  }
 
-    public int getPrecision(int index) {
-      assert (index >= 0 && index < precision.length);
-      return precision[index];
-    }
+  public class MeasureSpec extends ColumnSpec {
 
-    /**
-     * return number of measures
-     */
-    public int getNumMeasures() {
-      return types.length;
+    MeasureSpec(String fieldName, DataType dataType) {
+      super(fieldName, dataType);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 8f69a7c..6bf65da 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -98,7 +98,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
 
-    ColumnPageCodec codec = strategy.createCodec(meta, -1, -1);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     ColumnPage page = codec.decode(measureRawColumnChunk.getRawData().array(),
         measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 09f367a..7511b6e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -135,7 +135,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     byte[] encodedMeta = measureColumnChunk.getEncoder_meta().get(0).array();
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta);
-    ColumnPageCodec codec = strategy.createCodec(meta);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 492d46a..1881791 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -232,7 +232,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
 
     ColumnPageCodecMeta meta = new ColumnPageCodecMeta();
     meta.deserialize(encodedMeta);
-    ColumnPageCodec codec = strategy.createCodec(meta);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 180c092..90300d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -183,6 +183,9 @@ public abstract class ColumnPage {
         case DECIMAL:
           instance = newDecimalPage(new byte[pageSize][], scale, precision);
           break;
+        case BYTE_ARRAY:
+          instance = new SafeVarLengthColumnPage(dataType, pageSize);
+          break;
         default:
           throw new RuntimeException("Unsupported data dataType: " + dataType);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index d9b8e54..5698e39 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
 
-
 // Represent a complex column page, e.g. Array, Struct type column
 public class ComplexColumnPage {
 
@@ -77,4 +76,7 @@ public class ComplexColumnPage {
     return depth;
   }
 
+  public int getPageSize() {
+    return pageSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index 6b3a365..7ae606f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -60,6 +61,11 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
   public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
 
   @Override
+  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
   public String toString() {
     return String.format("%s[src type: %s, target type: %s, stats(%s)]",
         getClass().getName(), srcDataType, targetDataType, stats);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index a77bf69..ac7a79e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 
 /**
@@ -39,6 +40,12 @@ public interface ColumnPageCodec {
   EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException;
 
   /**
+   * encode complex column page and return the coded data
+   * TODO: remove this interface after complex column page is unified with column page
+   */
+  EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input);
+
+  /**
    * decode byte array from offset to a column page
    * @param input encoded byte array
    * @param offset startoffset of the input to decode

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ComplexDimensionIndexCodec.java
new file mode 100644
index 0000000..12efba4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ComplexDimensionIndexCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ComplexDimensionIndexCodec extends IndexStorageCodec {
+
+  ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "ComplexDimensionIndexCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) throws MemoryException {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+    EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()];
+    int index = 0;
+    Iterator<byte[][]> iterator = input.iterator();
+    while (iterator.hasNext()) {
+      byte[][] data = iterator.next();
+      encodedPages[index++] = encodeChildColumn(input.getPageSize(), data);
+    }
+    return encodedPages;
+  }
+
+  private EncodedColumnPage encodeChildColumn(int pageSize, byte[][] data) {
+    IndexStorage indexStorage;
+    if (version == ColumnarFormatVersion.V3) {
+      indexStorage = new BlockIndexerStorageForShort(data, false, false, false);
+    } else {
+      indexStorage = new BlockIndexerStorageForInt(data, false, false, false);
+    }
+    byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+    byte[] compressed = compressor.compressByte(flattened);
+    return new EncodedDimensionPage(pageSize, compressed, indexStorage,
+        DimensionType.COMPLEX);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
index d2d3a44..b9aac73 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
@@ -125,16 +126,46 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     }
   }
 
-  @Override ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats) {
+  @Override
+  ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
 
   // for decimal, currently it is a very basic implementation
-  @Override ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats) {
+  @Override
+  ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
 
-  @Override ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats) {
+  @Override
+  ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
+
+  @Override
+  public ColumnPageCodec newCodec(TableSpec.DimensionSpec dimensionSpec) {
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    switch (dimensionSpec.getDimensionType()) {
+      case GLOBAL_DICTIONARY:
+        return new DictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor);
+      case DIRECT_DICTIONARY:
+        return new DirectDictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor);
+      case PLAIN_VALUE:
+        return new HighCardDictDimensionIndexCodec(
+            dimensionSpec.isInSortColumns(),
+            dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            compressor);
+      case COMPLEX:
+        return new ComplexDimensionIndexCodec(false, false, compressor);
+      default:
+        throw new RuntimeException("unsupported dimension type: " +
+            dimensionSpec.getDimensionType());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DictDimensionIndexCodec.java
new file mode 100644
index 0000000..20b63ba
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DictDimensionIndexCodec.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class DictDimensionIndexCodec extends IndexStorageCodec {
+
+  DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "DictDimensionIndexCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) {
+    IndexStorage indexStorage;
+    byte[][] data = input.getByteArrayPage();
+    if (isInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
+      } else {
+        indexStorage = new BlockIndexerStorageForInt(data, true, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
+    byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+    byte[] compressed = compressor.compressByte(flattened);
+    return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage,
+        DimensionType.GLOBAL_DICTIONARY);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
index a1d4b61..664926c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
@@ -58,6 +59,11 @@ public class DirectCompressCodec implements ColumnPageCodec {
   }
 
   @Override
+  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
   public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
     ColumnPage page = ColumnPage
         .decompress(compressor, stats.getDataType(), input, offset, length, stats.getScale(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectDictDimensionIndexCodec.java
new file mode 100644
index 0000000..d3e5e66
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectDictDimensionIndexCodec.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
+
+  DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "DirectDictDimensionIndexCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) throws MemoryException {
+    IndexStorage indexStorage;
+    byte[][] data = input.getByteArrayPage();
+    if (isInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
+      } else {
+        indexStorage = new BlockIndexerStorageForInt(data, false, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
+    byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+    byte[] compressed = compressor.compressByte(flattened);
+    return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage,
+        DimensionType.GLOBAL_DICTIONARY);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index ee13277..29219ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
@@ -30,7 +31,7 @@ public abstract class EncodingStrategy {
   /**
    * create codec based on the page data type and statistics
    */
-  public ColumnPageCodec createCodec(SimpleStatsResult stats) {
+  public ColumnPageCodec newCodec(SimpleStatsResult stats) {
     switch (stats.getDataType()) {
       case BYTE:
       case SHORT:
@@ -53,7 +54,7 @@ public abstract class EncodingStrategy {
   /**
    * create codec based on the page data type and statistics contained by ValueEncoderMeta
    */
-  public ColumnPageCodec createCodec(ValueEncoderMeta meta, int scale, int precision) {
+  public ColumnPageCodec newCodec(ValueEncoderMeta meta, int scale, int precision) {
     if (meta instanceof ColumnPageCodecMeta) {
       ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta;
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta);
@@ -108,4 +109,7 @@ public abstract class EncodingStrategy {
   // for byte array
   abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats);
 
+  // for dimension column
+  public abstract ColumnPageCodec newCodec(TableSpec.DimensionSpec dimensionSpec);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
new file mode 100644
index 0000000..c1620c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class HighCardDictDimensionIndexCodec  extends IndexStorageCodec {
+
+  HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "HighCardDictDimensionIndexCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) throws MemoryException {
+    IndexStorage indexStorage;
+    byte[][] data = input.getByteArrayPage();
+    if (isInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
+      } else {
+        indexStorage = new BlockIndexerStorageForInt(data, false, true, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+      } else {
+        indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
+    byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+    byte[] compressed = compressor.compressByte(flattened);
+    return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage,
+        DimensionType.PLAIN_VALUE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
new file mode 100644
index 0000000..3122b15
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public abstract class IndexStorageCodec implements ColumnPageCodec {
+  protected ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+  protected Compressor compressor;
+  protected boolean isSort;
+  protected boolean isInvertedIndex;
+
+  IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    this.isSort = isSort;
+    this.isInvertedIndex = isInvertedIndex;
+    this.compressor = compressor;
+  }
+
+  @Override
+  public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+    throw new UnsupportedOperationException("internal error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
index dd01c56..b4c052f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
@@ -114,6 +114,10 @@ public class CarbonDimension extends CarbonColumn {
     this.complexTypeOrdinal = complexTypeOrdinal;
   }
 
+  public boolean isDirectDictionaryEncoding() {
+    return getEncoder().contains(Encoding.DIRECT_DICTIONARY);
+  }
+
   public boolean isGlobalDictionaryEncoding() {
     return getEncoder().contains(Encoding.DICTIONARY);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index db7717c..4c78fa9 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -706,22 +706,21 @@ object CarbonDataRDDFactory {
         }
       }
       // create new segment folder  in carbon store
-      if (!updateModel.isDefined) {
+      if (updateModel.isEmpty) {
         CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
           carbonLoadModel.getSegmentId, carbonTable)
       }
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
-      val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel)
-      val sortScope = CarbonDataProcessorUtil.getSortScope(configuration)
+      val isSortTable = carbonTable.getNumberOfSortColumns > 0
+      val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
       try {
         if (updateModel.isDefined) {
           loadDataFrameForUpdate()
         } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
           loadDataForPartitionTable()
-        } else if (configuration.isSortTable &&
-            sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+        } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
           LOGGER.audit("Using global sort for loading.")
           status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
             dataFrame, carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 6ed5d31..9c48af7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -56,7 +56,6 @@ import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * Fact data handler class to handle the fact data
@@ -173,9 +172,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     if (isAggKeyBlock) {
       int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
-      for (int i = 0; i < model.getTableSpec().getDimensionSpec().getNumSimpleDimensions(); i++) {
-        if (CarbonDataProcessorUtil
-            .isRleApplicableForColumn(model.getTableSpec().getDimensionSpec().getType(i))) {
+      for (int i = 0; i < model.getTableSpec().getNumSimpleDimensions(); i++) {
+        if (model.getSegmentProperties().getDimensions().get(i).isGlobalDictionaryEncoding()) {
           this.rleEncodingForDictDimension[i] = true;
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index c5a9bec..9881e8e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,25 +22,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
@@ -54,10 +46,8 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.spark.sql.types.Decimal;
 
@@ -72,9 +62,9 @@ public class TablePage {
 
   // TODO: we should have separate class for key columns so that keys are stored together in
   // one vector to make it efficient for sorting
-  private ColumnPage[] dictDimensionPage;
-  private ColumnPage[] noDictDimensionPage;
-  private ComplexColumnPage[] complexDimensionPage;
+  private ColumnPage[] dictDimensionPages;
+  private ColumnPage[] noDictDimensionPages;
+  private ComplexColumnPage[] complexDimensionPages;
   private ColumnPage[] measurePage;
 
   // the num of rows in this page, it must be less than short value (65536)
@@ -90,23 +80,23 @@ public class TablePage {
     this.model = model;
     this.pageSize = pageSize;
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
-    dictDimensionPage = new ColumnPage[numDictDimension];
-    for (int i = 0; i < dictDimensionPage.length; i++) {
-      ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+    dictDimensionPages = new ColumnPage[numDictDimension];
+    for (int i = 0; i < dictDimensionPages.length; i++) {
+      ColumnPage page = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
       page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
-      dictDimensionPage[i] = page;
+      dictDimensionPages[i] = page;
     }
-    noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
-    for (int i = 0; i < noDictDimensionPage.length; i++) {
-      ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+    noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
+    for (int i = 0; i < noDictDimensionPages.length; i++) {
+      ColumnPage page = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
       page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
-      noDictDimensionPage[i] = page;
+      noDictDimensionPages[i] = page;
     }
-    complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
-    for (int i = 0; i < complexDimensionPage.length; i++) {
+    complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()];
+    for (int i = 0; i < complexDimensionPages.length; i++) {
       // here we still do not the depth of the complex column, it will be initialized when
       // we get the first row.
-      complexDimensionPage[i] = null;
+      complexDimensionPages[i] = null;
     }
     measurePage = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
@@ -117,7 +107,7 @@ public class TablePage {
       page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i], pageSize));
       measurePage[i] = page;
     }
-    boolean hasNoDictionary = noDictDimensionPage.length > 0;
+    boolean hasNoDictionary = noDictDimensionPages.length > 0;
     this.key = new TablePageKey(pageSize, model.getMDKeyGenerator(), model.getSegmentProperties(),
         hasNoDictionary);
   }
@@ -140,13 +130,13 @@ public class TablePage {
       throws KeyGenException {
     // 1. convert dictionary columns
     byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
-    for (int i = 0; i < dictDimensionPage.length; i++) {
-      dictDimensionPage[i].putData(rowId, keys[i]);
+    for (int i = 0; i < dictDimensionPages.length; i++) {
+      dictDimensionPages[i].putData(rowId, keys[i]);
     }
 
     // 2. convert noDictionary columns and complex columns.
-    int noDictionaryCount = noDictDimensionPage.length;
-    int complexColumnCount = complexDimensionPage.length;
+    int noDictionaryCount = noDictDimensionPages.length;
+    int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
       byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
@@ -154,7 +144,7 @@ public class TablePage {
           // noDictionary columns, since it is variable length, we need to prepare each
           // element as LV result byte array (first two bytes are the length of the array)
           byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
-          noDictDimensionPage[i].putData(rowId, valueWithLength);
+          noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else {
           // complex columns
           addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
@@ -194,13 +184,13 @@ public class TablePage {
     // initialize the page if first row
     if (rowId == 0) {
       int depthInComplexColumn = complexDataType.getColsCount();
-      complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+      complexDimensionPages[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
     }
 
-    int depthInComplexColumn = complexDimensionPage[index].getDepth();
+    int depthInComplexColumn = complexDimensionPages[index].getDepth();
     // this is the result columnar data which will be added to page,
     // size of this list is the depth of complex column, we will fill it by input data
-    List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
+    List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>(depthInComplexColumn);
     for (int k = 0; k < depthInComplexColumn; k++) {
       encodedComplexColumnar.add(new ArrayList<byte[]>());
     }
@@ -221,15 +211,15 @@ public class TablePage {
     }
 
     for (int depth = 0; depth < depthInComplexColumn; depth++) {
-      complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+      complexDimensionPages[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
     }
   }
 
   void freeMemory() {
-    for (ColumnPage page : dictDimensionPage) {
+    for (ColumnPage page : dictDimensionPages) {
       page.freeMemory();
     }
-    for (ColumnPage page : noDictDimensionPage) {
+    for (ColumnPage page : noDictDimensionPages) {
       page.freeMemory();
     }
     for (ColumnPage page : measurePage) {
@@ -264,146 +254,52 @@ public class TablePage {
       throws MemoryException, IOException {
     EncodedMeasurePage[] encodedMeasures = new EncodedMeasurePage[measurePage.length];
     for (int i = 0; i < measurePage.length; i++) {
-      SimpleStatsResult stats = (SimpleStatsResult)(measurePage[i].getStatistics());
-      ColumnPageCodec encoder = encodingStrategy.createCodec(stats);
+      ColumnPageCodec encoder =
+          encodingStrategy.newCodec((SimpleStatsResult)(measurePage[i].getStatistics()));
       encodedMeasures[i] = (EncodedMeasurePage) encoder.encode(measurePage[i]);
     }
     return encodedMeasures;
   }
 
-  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
-    if (version == ColumnarFormatVersion.V3) {
-      return new BlockIndexerStorageForShort(data, false, false, false);
-    } else {
-      return new BlockIndexerStorageForInt(data, false, false, false);
-    }
-  }
-
-  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
   // apply and compress each dimension, set encoded data in `encodedData`
   private EncodedDimensionPage[] encodeAndCompressDimensions()
-      throws KeyGenException {
-    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    int indexStorageOffset = 0;
-    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    EncodedDimensionPage[] compressedColumns = new EncodedDimensionPage[indexStorages.length];
-    boolean[] isUseInvertedIndex = model.getIsUseInvertedIndex();
-    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
-      ColumnPage page;
-      byte[] flattened;
-      boolean isSortColumn = model.isSortColumn(i);
-      switch (dimensionSpec.getType(i)) {
+      throws KeyGenException, IOException, MemoryException {
+    List<EncodedDimensionPage> encodedDimensions = new ArrayList<>();
+    List<EncodedDimensionPage> encodedComplexDimenions = new ArrayList<>();
+    TableSpec tableSpec = model.getTableSpec();
+    int dictIndex = 0;
+    int noDictIndex = 0;
+    int complexDimIndex = 0;
+    int numDimensions = tableSpec.getNumDimensions();
+    for (int i = 0; i < numDimensions; i++) {
+      ColumnPageCodec codec;
+      EncodedDimensionPage encodedPage;
+      TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
+      switch (spec.getDimensionType()) {
         case GLOBAL_DICTIONARY:
-          // dictionary dimension
-          page = dictDimensionPage[++dictionaryColumnCount];
-          indexStorages[indexStorageOffset] = encodeAndCompressDictDimension(
-              page.getByteArrayPage(),
-              isSortColumn,
-              isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
-          break;
         case DIRECT_DICTIONARY:
-          // timestamp and date column
-          page = dictDimensionPage[++dictionaryColumnCount];
-          indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension(
-              page.getByteArrayPage(),
-              isSortColumn,
-              isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+          codec = encodingStrategy.newCodec(spec);
+          encodedPage = (EncodedDimensionPage) codec.encode(dictDimensionPages[dictIndex++]);
+          encodedDimensions.add(encodedPage);
           break;
         case PLAIN_VALUE:
-          // high cardinality dimension, encoded as plain string
-          page = noDictDimensionPage[++noDictionaryColumnCount];
-          indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension(
-              page.getByteArrayPage(),
-              isSortColumn,
-              isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+          codec = encodingStrategy.newCodec(spec);
+          encodedPage = (EncodedDimensionPage) codec.encode(noDictDimensionPages[noDictIndex++]);
+          encodedDimensions.add(encodedPage);
           break;
         case COMPLEX:
-          // we need to add complex column at last, so skipping it here
-          continue;
-        default:
-          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+          codec = encodingStrategy.newCodec(spec);
+          EncodedColumnPage[] encodedPages = codec.encodeComplexColumn(
+              complexDimensionPages[complexDimIndex++]);
+          for (EncodedColumnPage page : encodedPages) {
+            encodedComplexDimenions.add((EncodedDimensionPage) page);
+          }
+          break;
       }
-      byte[] compressedData = compressor.compressByte(flattened);
-      compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
-          pageSize, compressedData, indexStorages[indexStorageOffset], dimensionSpec.getType(i));
-      SimpleStatsResult stats = (SimpleStatsResult) page.getStatistics();
-      compressedColumns[indexStorageOffset].setNullBitSet(stats.getNullBits());
-      indexStorageOffset++;
     }
 
-    // handle complex type column
-    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
-      Iterator<byte[][]> iterator = complexDimensionPage[i].iterator();
-      while (iterator.hasNext()) {
-        byte[][] data = iterator.next();
-        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
-        byte[] flattened = ByteUtil.flatten(data);
-        byte[] compressedData = compressor.compressByte(flattened);
-        compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
-            pageSize, compressedData, indexStorages[indexStorageOffset], DimensionType.COMPLEX);
-        indexStorageOffset++;
-      }
-    }
-    return compressedColumns;
+    encodedDimensions.addAll(encodedComplexDimenions);
+    return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5af0ff2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 815c752..b46a42c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -488,6 +488,26 @@ public final class CarbonDataProcessorUtil {
     return sortScope;
   }
 
+  public static SortScopeOptions.SortScope getSortScope(String sortScopeString) {
+    SortScopeOptions.SortScope sortScope;
+    try {
+      // first check whether user input it from ddl, otherwise get from carbon properties
+      if (sortScopeString == null) {
+        sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
+      } else {
+        sortScope = SortScopeOptions.getSortScope(sortScopeString);
+      }
+      LOGGER.warn("sort scope is set to " + sortScope);
+    } catch (Exception e) {
+      sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
+      LOGGER.warn("Exception occured while resolving sort scope. " +
+          "sort scope is set to " + sortScope);
+    }
+    return sortScope;
+  }
+
   /**
    * Get the batch sort size
    * @param configuration