You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:19 UTC

[34/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java b/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
new file mode 100644
index 0000000..9fb1e6e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/LoadStatistics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+public interface LoadStatistics {
+  //Init PartitonInfo
+  void  initPartitonInfo(String PartitionId);
+
+  //Record the time
+  void recordDicShuffleAndWriteTime();
+
+  void recordLoadCsvfilesToDfTime();
+
+  void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint);
+
+  void recordCsvInputStepTime(String partitionID,
+      Long csvInputStepTimePoint);
+
+  void recordLruCacheLoadTime(double lruCacheLoadTime);
+
+  void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint);
+
+  void recordSortRowsStepTotalTime(String partitionID,
+      Long sortRowsStepTotalTimePoint);
+
+  void recordMdkGenerateTotalTime(String partitionID,
+      Long mdkGenerateTotalTimePoint);
+
+  void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint);
+
+  //Record the node blocks information map
+  void recordHostBlockMap(String host, Integer numBlocks);
+
+  //Record the partition blocks information map
+  void recordPartitionBlockMap(String partitionID, Integer numBlocks);
+
+  //Record total num of records processed
+  void recordTotalRecords(long totalRecords);
+
+  //Print the statistics information
+  void printStatisticsInfo(String partitionID);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
new file mode 100644
index 0000000..4a229d6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -0,0 +1,1027 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressByteArray;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressDefaultLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalShort;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneByte;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneDefault;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneFloat;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneInt;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneLong;
+import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressNoneShort;
+
+public final class ValueCompressionUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ValueCompressionUtil.class.getName());
+
+  private ValueCompressionUtil() {
+
+  }
+
+  /**
+   * decide actual type of value
+   *
+   * @param value   :the measure value
+   * @param decimal :
+   * @return: actual type of value
+   * @see
+   */
+  private static DataType getDataType(double value, int decimal, byte dataTypeSelected) {
+    DataType dataType = DataType.DATA_DOUBLE;
+    if (decimal == 0) {
+      if (value < Byte.MAX_VALUE) {
+        dataType = DataType.DATA_BYTE;
+      } else if (value < Short.MAX_VALUE) {
+        dataType = DataType.DATA_SHORT;
+      } else if (value < Integer.MAX_VALUE) {
+        dataType = DataType.DATA_INT;
+      } else if (value < Long.MAX_VALUE) {
+        dataType = DataType.DATA_LONG;
+      }
+    } else {
+      if (dataTypeSelected == 1) {
+        if (value < Float.MAX_VALUE) {
+          float floatValue = (float) value;
+          if (floatValue - value != 0) {
+            dataType = DataType.DATA_DOUBLE;
+
+          } else {
+            dataType = DataType.DATA_FLOAT;
+          }
+        } else if (value < Double.MAX_VALUE) {
+          dataType = DataType.DATA_DOUBLE;
+        }
+      }
+    }
+    return dataType;
+  }
+
+  /**
+   * Gives the size of datatype
+   *
+   * @param dataType : measure value type
+   * @return: the size of DataType
+   * @see
+   */
+  public static int getSize(DataType dataType) {
+
+    switch (dataType) {
+      case DATA_BYTE:
+        return 1;
+      case DATA_SHORT:
+        return 2;
+      case DATA_INT:
+      case DATA_FLOAT:
+        return 4;
+      default:
+        return 8;
+    }
+  }
+
+  /**
+   * get the best compression type. priority list,from high to low:
+   * COMPRESSION_TYPE.NONE COMPRESSION_TYPE.MAX_MIN
+   * COMPRESSION_TYPE.NON_DECIMAL_CONVERT COMPRESSION_TYPE.MAX_MIN_NDC
+   *
+   * @param maxValue : max value of one measure
+   * @param minValue : min value of one measure
+   * @param decimal  : decimal num of one measure
+   * @return : the best compression type
+   * @see
+   */
+  private static CompressionFinder getCompressionType(Object maxValue, Object minValue, int decimal,
+      char aggregatorType, byte dataTypeSelected) {
+    // 'c' for aggregate table,'b' fo rBigdecimal, 'l' for long,'n' for double
+    switch (aggregatorType) {
+      case 'c':
+        return new CompressionFinder(COMPRESSION_TYPE.CUSTOM, DataType.DATA_BYTE,
+            DataType.DATA_BYTE);
+      case 'b':
+        return new CompressionFinder(COMPRESSION_TYPE.CUSTOM_BIGDECIMAL, DataType.DATA_BYTE,
+            DataType.DATA_BYTE);
+      case 'l':
+        return new CompressionFinder(COMPRESSION_TYPE.NONE,
+                DataType.DATA_BIGINT, DataType.DATA_BIGINT);
+      default:
+        break;
+    }
+    // None Decimal
+    if (decimal == 0) {
+      if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) > getSize(
+          getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
+        return new CompressionFinder(COMPRESSION_TYPE.MAX_MIN, DataType.DATA_DOUBLE,
+            getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
+      } else if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) < getSize(
+              getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
+        return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
+                getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
+      } else {
+        return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
+            getDataType((double) maxValue, decimal, dataTypeSelected));
+      }
+    }
+    // decimal
+    else {
+      DataType actualDataType = getDataType((double) maxValue, decimal, dataTypeSelected);
+      DataType diffDataType =
+          getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected);
+      DataType maxNonDecDataType =
+          getDataType(Math.pow(10, decimal) * (double) maxValue, 0, dataTypeSelected);
+      DataType diffNonDecDataType =
+          getDataType(Math.pow(10, decimal) * ((double) maxValue - (double) minValue), 0,
+              dataTypeSelected);
+
+      CompressionFinder[] finders = new CompressionFinder[] {
+          new CompressionFinder(actualDataType, actualDataType, CompressionFinder.PRIORITY.ACTUAL,
+              COMPRESSION_TYPE.NONE),
+          new CompressionFinder(actualDataType, diffDataType, CompressionFinder.PRIORITY.DIFFSIZE,
+              COMPRESSION_TYPE.MAX_MIN), new CompressionFinder(actualDataType, maxNonDecDataType,
+          CompressionFinder.PRIORITY.MAXNONDECIMAL, COMPRESSION_TYPE.NON_DECIMAL_CONVERT),
+          new CompressionFinder(actualDataType, diffNonDecDataType,
+              CompressionFinder.PRIORITY.DIFFNONDECIMAL, COMPRESSION_TYPE.MAX_MIN_NDC) };
+      // sort the compressionFinder.The top have the highest priority
+      Arrays.sort(finders);
+      CompressionFinder compression = finders[0];
+      return compression;
+    }
+  }
+
+  /**
+   * @param compType        : compression type
+   * @param values          : the data of one measure
+   * @param changedDataType : changed data type
+   * @param maxValue        : the max value of one measure
+   * @param decimal         : the decimal length of one measure
+   * @return: the compress data array
+   * @see
+   */
+  public static Object getCompressedValues(COMPRESSION_TYPE compType, double[] values,
+      DataType changedDataType, double maxValue, int decimal) {
+    Object o;
+    switch (compType) {
+      case NONE:
+
+        o = compressNone(changedDataType, values);
+        return o;
+
+      case MAX_MIN:
+
+        o = compressMaxMin(changedDataType, values, maxValue);
+        return o;
+
+      case NON_DECIMAL_CONVERT:
+
+        o = compressNonDecimal(changedDataType, values, decimal);
+        return o;
+
+      default:
+        o = compressNonDecimalMaxMin(changedDataType, values, decimal, maxValue);
+        return o;
+    }
+  }
+
+  public static Object getCompressedValues(COMPRESSION_TYPE compType, long[] values,
+      DataType changedDataType, long maxValue, int decimal) {
+    Object o;
+    switch (compType) {
+      case NONE:
+      default:
+        return values;
+    }
+  }
+
+  private static ValueCompressonHolder.UnCompressValue[] getUncompressedValues(
+      COMPRESSION_TYPE[] compType, DataType[] actualDataType, DataType[] changedDataType) {
+
+    ValueCompressonHolder.UnCompressValue[] compressValue =
+        new ValueCompressonHolder.UnCompressValue[changedDataType.length];
+    for (int i = 0; i < changedDataType.length; i++) {
+      switch (compType[i]) {
+        case NONE:
+
+          compressValue[i] = unCompressNone(changedDataType[i], actualDataType[i]);
+          break;
+
+        case MAX_MIN:
+
+          compressValue[i] = unCompressMaxMin(changedDataType[i], actualDataType[i]);
+          break;
+
+        case NON_DECIMAL_CONVERT:
+
+          compressValue[i] = unCompressNonDecimal(changedDataType[i], DataType.DATA_DOUBLE);
+          break;
+
+        case CUSTOM:
+          compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BYTE_ARRAY);
+          break;
+
+        case CUSTOM_BIGDECIMAL:
+          compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BIG_DECIMAL);
+          break;
+
+        default:
+          compressValue[i] = unCompressNonDecimalMaxMin(changedDataType[i], null);
+      }
+    }
+    return compressValue;
+
+  }
+
+  /**
+   * compress data to other type for example: double -> int
+   */
+  private static Object compressNone(DataType changedDataType, double[] value) {
+    int i = 0;
+    switch (changedDataType) {
+
+      case DATA_BYTE:
+
+        byte[] result = new byte[value.length];
+
+        for (double a : value) {
+          result[i] = (byte) a;
+          i++;
+        }
+        return result;
+
+      case DATA_SHORT:
+
+        short[] shortResult = new short[value.length];
+
+        for (double a : value) {
+          shortResult[i] = (short) a;
+          i++;
+        }
+        return shortResult;
+
+      case DATA_INT:
+
+        int[] intResult = new int[value.length];
+
+        for (double a : value) {
+          intResult[i] = (int) a;
+          i++;
+        }
+        return intResult;
+
+      case DATA_LONG:
+      case DATA_BIGINT:
+
+        long[] longResult = new long[value.length];
+
+        for (double a : value) {
+          longResult[i] = (long) a;
+          i++;
+        }
+        return longResult;
+
+      case DATA_FLOAT:
+
+        float[] floatResult = new float[value.length];
+
+        for (double a : value) {
+          floatResult[i] = (float) a;
+          i++;
+        }
+        return floatResult;
+
+      default:
+
+        return value;
+
+    }
+  }
+
+  /**
+   * compress data to other type through sub value for example: 1. subValue =
+   * maxValue - value 2. subValue: double->int
+   */
+  private static Object compressMaxMin(DataType changedDataType, double[] value, double maxValue) {
+    int i = 0;
+    switch (changedDataType) {
+      case DATA_BYTE:
+
+        byte[] result = new byte[value.length];
+        for (double a : value) {
+          result[i] = (byte) (maxValue - a);
+          i++;
+        }
+        return result;
+
+      case DATA_SHORT:
+
+        short[] shortResult = new short[value.length];
+
+        for (double a : value) {
+          shortResult[i] = (short) (maxValue - a);
+          i++;
+        }
+        return shortResult;
+
+      case DATA_INT:
+
+        int[] intResult = new int[value.length];
+
+        for (double a : value) {
+          intResult[i] = (int) (maxValue - a);
+          i++;
+        }
+        return intResult;
+
+      case DATA_LONG:
+
+        long[] longResult = new long[value.length];
+
+        for (double a : value) {
+          longResult[i] = (long) (maxValue - a);
+          i++;
+        }
+        return longResult;
+
+      case DATA_FLOAT:
+
+        float[] floatResult = new float[value.length];
+
+        for (double a : value) {
+          floatResult[i] = (float) (maxValue - a);
+          i++;
+        }
+        return floatResult;
+
+      default:
+
+        double[] defaultResult = new double[value.length];
+
+        for (double a : value) {
+          defaultResult[i] = (double) (maxValue - a);
+          i++;
+        }
+        return defaultResult;
+
+    }
+  }
+
+  /**
+   * compress data to other type through sub value for example: 1. subValue =
+   * value * Math.pow(10, decimal) 2. subValue: double->int
+   */
+  private static Object compressNonDecimal(DataType changedDataType, double[] value, int decimal) {
+    int i = 0;
+    switch (changedDataType) {
+      case DATA_BYTE:
+        byte[] result = new byte[value.length];
+
+        for (double a : value) {
+          result[i] = (byte) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return result;
+      case DATA_SHORT:
+        short[] shortResult = new short[value.length];
+
+        for (double a : value) {
+          shortResult[i] = (short) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return shortResult;
+      case DATA_INT:
+
+        int[] intResult = new int[value.length];
+
+        for (double a : value) {
+          intResult[i] = (int) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return intResult;
+
+      case DATA_LONG:
+
+        long[] longResult = new long[value.length];
+
+        for (double a : value) {
+          longResult[i] = (long) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return longResult;
+
+      case DATA_FLOAT:
+
+        float[] floatResult = new float[value.length];
+
+        for (double a : value) {
+          floatResult[i] = (float) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return floatResult;
+
+      default:
+        double[] defaultResult = new double[value.length];
+
+        for (double a : value) {
+          defaultResult[i] = (double) (Math.round(Math.pow(10, decimal) * a));
+          i++;
+        }
+        return defaultResult;
+    }
+  }
+
+  /**
+   * compress data to other type through sub value for example: 1. subValue =
+   * maxValue - value 2. subValue = subValue * Math.pow(10, decimal) 3.
+   * subValue: double->int
+   */
+  private static Object compressNonDecimalMaxMin(DataType changedDataType, double[] value,
+      int decimal, double maxValue) {
+    int i = 0;
+    switch (changedDataType) {
+      case DATA_BYTE:
+
+        byte[] result = new byte[value.length];
+
+        for (double a : value) {
+          result[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return result;
+
+      case DATA_SHORT:
+
+        short[] shortResult = new short[value.length];
+
+        for (double a : value) {
+          shortResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return shortResult;
+
+      case DATA_INT:
+
+        int[] intResult = new int[value.length];
+
+        for (double a : value) {
+          intResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return intResult;
+
+      case DATA_LONG:
+
+        long[] longResult = new long[value.length];
+
+        for (double a : value) {
+          longResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return longResult;
+
+      case DATA_FLOAT:
+
+        float[] floatResult = new float[value.length];
+
+        for (double a : value) {
+          floatResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return floatResult;
+
+      default:
+
+        double[] defaultResult = new double[value.length];
+
+        for (double a : value) {
+          defaultResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
+          i++;
+        }
+        return defaultResult;
+
+    }
+  }
+
+  /**
+   * uncompress data for example: int -> double
+   */
+  public static ValueCompressonHolder.UnCompressValue unCompressNone(DataType compDataType,
+      DataType actualDataType) {
+    if (actualDataType == DataType.DATA_BIGINT) {
+      return new UnCompressDefaultLong();
+    } else {
+      switch (compDataType) {
+        case DATA_BYTE:
+
+          return new UnCompressNoneByte();
+
+        case DATA_SHORT:
+
+          return new UnCompressNoneShort();
+
+        case DATA_INT:
+
+          return new UnCompressNoneInt();
+
+        case DATA_LONG:
+
+          return new UnCompressNoneLong();
+
+        case DATA_FLOAT:
+
+          return new UnCompressNoneFloat();
+
+        default:
+
+          return new UnCompressNoneDefault();
+      }
+    }
+  }
+
+  /**
+   * uncompress data 1. value = maxValue - subValue 2. value: int->double
+   */
+  public static ValueCompressonHolder.UnCompressValue unCompressMaxMin(DataType compDataType,
+      DataType actualDataType) {
+    switch (compDataType) {
+      case DATA_BYTE:
+
+        return new UnCompressMaxMinByte();
+
+      case DATA_SHORT:
+
+        return new UnCompressMaxMinShort();
+
+      case DATA_INT:
+
+        return new UnCompressMaxMinInt();
+
+      case DATA_LONG:
+
+        return new UnCompressMaxMinLong();
+
+      case DATA_FLOAT:
+
+        return new UnCompressMaxMinFloat();
+
+      default:
+
+        return new UnCompressMaxMinDefault();
+
+    }
+  }
+
+  /**
+   * uncompress data value = value/Math.pow(10, decimal)
+   */
+  public static ValueCompressonHolder.UnCompressValue unCompressNonDecimal(DataType compDataType,
+      DataType actualDataType) {
+    switch (compDataType) {
+      case DATA_BYTE:
+
+        return new UnCompressNonDecimalByte();
+
+      case DATA_SHORT:
+
+        return new UnCompressNonDecimalShort();
+
+      case DATA_INT:
+
+        return new UnCompressNonDecimalInt();
+
+      case DATA_LONG:
+
+        return new UnCompressNonDecimalLong();
+
+      case DATA_FLOAT:
+
+        return new UnCompressNonDecimalFloat();
+
+      default:
+
+        return new UnCompressNonDecimalDefault();
+
+    }
+  }
+
+  /**
+   * uncompress data value = (maxValue - subValue)/Math.pow(10, decimal)
+   */
+  public static ValueCompressonHolder.UnCompressValue unCompressNonDecimalMaxMin(
+      DataType compDataType, DataType actualDataType) {
+    switch (compDataType) {
+      case DATA_BYTE:
+
+        return new UnCompressNonDecimalMaxMinByte();
+
+      case DATA_SHORT:
+
+        return new UnCompressNonDecimalMaxMinShort();
+
+      case DATA_INT:
+
+        return new UnCompressNonDecimalMaxMinInt();
+
+      case DATA_LONG:
+
+        return new UnCompressNonDecimalMaxMinLong();
+
+      case DATA_FLOAT:
+
+        return new UnCompressNonDecimalMaxMinFloat();
+
+      default:
+
+        return new UnCompressNonDecimalMaxMinDefault();
+
+    }
+  }
+
+  /**
+   * Create Value compression model
+   *
+   * @param maxValue
+   * @param minValue
+   * @param decimalLength
+   * @param uniqueValue
+   * @param aggType
+   * @param dataTypeSelected
+   * @return
+   */
+  public static ValueCompressionModel getValueCompressionModel(Object[] maxValue, Object[] minValue,
+      int[] decimalLength, Object[] uniqueValue, char[] aggType, byte[] dataTypeSelected) {
+
+    MeasureMetaDataModel metaDataModel =
+        new MeasureMetaDataModel(minValue, maxValue, decimalLength, maxValue.length, uniqueValue,
+            aggType, dataTypeSelected);
+    return getValueCompressionModel(metaDataModel);
+  }
+
+  public static ValueCompressionModel getValueCompressionModel(MeasureMetaDataModel measureMDMdl) {
+    int measureCount = measureMDMdl.getMeasureCount();
+    Object[] minValue = measureMDMdl.getMinValue();
+    Object[] maxValue = measureMDMdl.getMaxValue();
+    Object[] uniqueValue = measureMDMdl.getUniqueValue();
+    int[] decimal = measureMDMdl.getDecimal();
+    char[] type = measureMDMdl.getType();
+    byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
+    ValueCompressionModel compressionModel = new ValueCompressionModel();
+    DataType[] actualType = new DataType[measureCount];
+    DataType[] changedType = new DataType[measureCount];
+    COMPRESSION_TYPE[] compType = new COMPRESSION_TYPE[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      CompressionFinder compresssionFinder = ValueCompressionUtil
+          .getCompressionType(maxValue[i], minValue[i], decimal[i], type[i], dataTypeSelected[i]);
+      actualType[i] = compresssionFinder.actualDataType;
+      changedType[i] = compresssionFinder.changedDataType;
+      compType[i] = compresssionFinder.compType;
+    }
+    compressionModel.setMaxValue(maxValue);
+    compressionModel.setDecimal(decimal);
+    compressionModel.setChangedDataType(changedType);
+    compressionModel.setCompType(compType);
+    compressionModel.setActualDataType(actualType);
+    compressionModel.setMinValue(minValue);
+    compressionModel.setUniqueValue(uniqueValue);
+    compressionModel.setType(type);
+    compressionModel.setMinValueFactForAgg(measureMDMdl.getMinValueFactForAgg());
+    compressionModel.setDataTypeSelected(dataTypeSelected);
+    ValueCompressonHolder.UnCompressValue[] values = ValueCompressionUtil
+        .getUncompressedValues(compressionModel.getCompType(), compressionModel.getActualDataType(),
+            compressionModel.getChangedDataType());
+    compressionModel.setUnCompressValues(values);
+    return compressionModel;
+  }
+
+  public static byte[] convertToBytes(short[] values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * 2);
+    for (short val : values) {
+      buffer.putShort(val);
+    }
+    return buffer.array();
+  }
+
+  public static byte[] convertToBytes(int[] values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
+    for (int val : values) {
+      buffer.putInt(val);
+    }
+    return buffer.array();
+  }
+
+  public static byte[] convertToBytes(float[] values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
+    for (float val : values) {
+      buffer.putFloat(val);
+    }
+    return buffer.array();
+  }
+
+  public static byte[] convertToBytes(long[] values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
+    for (long val : values) {
+      buffer.putLong(val);
+    }
+    return buffer.array();
+  }
+
+  public static byte[] convertToBytes(double[] values) {
+    ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
+    for (double val : values) {
+      buffer.putDouble(val);
+    }
+    return buffer.array();
+  }
+
+  public static short[] convertToShortArray(ByteBuffer buffer, int length) {
+    buffer.rewind();
+    short[] values = new short[length / 2];
+
+    for (int i = 0; i < values.length; i++) {
+      values[i] = buffer.getShort();
+    }
+    return values;
+  }
+
+  public static int[] convertToIntArray(ByteBuffer buffer, int length) {
+    buffer.rewind();
+    int[] values = new int[length / 4];
+
+    for (int i = 0; i < values.length; i++) {
+      values[i] = buffer.getInt();
+    }
+    return values;
+  }
+
+  public static float[] convertToFloatArray(ByteBuffer buffer, int length) {
+    buffer.rewind();
+    float[] values = new float[length / 4];
+
+    for (int i = 0; i < values.length; i++) {
+      values[i] = buffer.getFloat();
+    }
+    return values;
+  }
+
+  public static long[] convertToLongArray(ByteBuffer buffer, int length) {
+    buffer.rewind();
+    long[] values = new long[length / 8];
+    for (int i = 0; i < values.length; i++) {
+      values[i] = buffer.getLong();
+    }
+    return values;
+  }
+
+  public static double[] convertToDoubleArray(ByteBuffer buffer, int length) {
+    buffer.rewind();
+    double[] values = new double[length / 8];
+    for (int i = 0; i < values.length; i++) {
+      values[i] = buffer.getDouble();
+    }
+    return values;
+  }
+
+  /**
+   * use to identify compression type.
+   */
+  public static enum COMPRESSION_TYPE {
+    /**
+     *
+     */
+    NONE, /**
+     *
+     */
+    MAX_MIN, /**
+     *
+     */
+    NON_DECIMAL_CONVERT, /**
+     *
+     */
+    MAX_MIN_NDC,
+
+    /**
+     * custome
+     */
+    CUSTOM,
+
+    CUSTOM_BIGDECIMAL
+  }
+
+  /**
+   * use to identify the type of data.
+   */
+  public static enum DataType {
+    /**
+     *
+     */
+    DATA_BYTE(), /**
+     *
+     */
+    DATA_SHORT(), /**
+     *
+     */
+    DATA_INT(), /**
+     *
+     */
+    DATA_FLOAT(), /**
+     *
+     */
+    DATA_LONG(), /**
+     *
+     */
+    DATA_BIGINT(), /**
+     *
+     */
+    DATA_DOUBLE();
+
+    /**
+     * DataType.
+     */
+    private DataType() {
+      //this.size = size;
+    }
+
+  }
+
+  /**
+   * through the size of data type,priority and compression type, select the
+   * best compression type
+   */
+  private static class CompressionFinder implements Comparable<CompressionFinder> {
+    /**
+     * compType.
+     */
+    private COMPRESSION_TYPE compType;
+    /**
+     * actualDataType.
+     */
+    private DataType actualDataType;
+    /**
+     * changedDataType.
+     */
+    private DataType changedDataType;
+    /**
+     * the size of changed data
+     */
+    private int size;
+    /**
+     * priority.
+     */
+    private PRIORITY priority;
+
+    /**
+     * CompressionFinder constructor.
+     *
+     * @param compType
+     * @param actualDataType
+     * @param changedDataType
+     */
+    CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
+        DataType changedDataType) {
+      super();
+      this.compType = compType;
+      this.actualDataType = actualDataType;
+      this.changedDataType = changedDataType;
+    }
+
+    /**
+     * CompressionFinder overloaded constructor.
+     *
+     * @param actualDataType
+     * @param changedDataType
+     * @param priority
+     * @param compType
+     */
+
+    CompressionFinder(DataType actualDataType, DataType changedDataType, PRIORITY priority,
+        COMPRESSION_TYPE compType) {
+      super();
+      this.actualDataType = actualDataType;
+      this.changedDataType = changedDataType;
+      this.size = getSize(changedDataType);
+      this.priority = priority;
+      this.compType = compType;
+    }
+
+    @Override public boolean equals(Object obj) {
+      boolean equals = false;
+      if (obj instanceof CompressionFinder) {
+        CompressionFinder cf = (CompressionFinder) obj;
+
+        if (this.size == cf.size && this.priority == cf.priority) {
+          equals = true;
+        }
+
+      }
+      return equals;
+    }
+
+    @Override public int hashCode() {
+      final int code = 31;
+      int result = 1;
+
+      result = code * result + this.size;
+      result = code * result + ((priority == null) ? 0 : priority.hashCode());
+      return result;
+    }
+
+    @Override public int compareTo(CompressionFinder o) {
+      int returnVal = 0;
+      // the big size have high priority
+      if (this.equals(o)) {
+        returnVal = 0;
+      } else if (this.size == o.size) {
+        // the compression type priority
+        if (priority.priority > o.priority.priority) {
+          returnVal = 1;
+        } else if (priority.priority < o.priority.priority) {
+          returnVal = -1;
+        }
+
+      } else if (this.size > o.size) {
+        returnVal = 1;
+      } else {
+        returnVal = -1;
+      }
+      return returnVal;
+    }
+
+    /**
+     * Compression type priority.
+     * ACTUAL is the highest priority and DIFFNONDECIMAL is the lowest
+     * priority
+     */
+    static enum PRIORITY {
+      /**
+       *
+       */
+      ACTUAL(0), /**
+       *
+       */
+      DIFFSIZE(1), /**
+       *
+       */
+      MAXNONDECIMAL(2), /**
+       *
+       */
+      DIFFNONDECIMAL(3);
+
+      /**
+       * priority.
+       */
+      private int priority;
+
+      private PRIORITY(int priority) {
+        this.priority = priority;
+      }
+    }
+  }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java b/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
new file mode 100644
index 0000000..73bbf11
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ByteArrayHolder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ByteArrayHolder implements Comparable<ByteArrayHolder> {
+
+  /**
+   * mdkey
+   */
+  private byte[] mdKey;
+
+  /**
+   * primary key
+   */
+  private int primaryKey;
+
+  /**
+   * @param mdKey
+   * @param primaryKey
+   */
+  public ByteArrayHolder(byte[] mdKey, int primaryKey) {
+    this.mdKey = mdKey;
+    this.primaryKey = primaryKey;
+  }
+
+  @Override public int compareTo(ByteArrayHolder o) {
+    return ByteUtil.compare(mdKey, o.mdKey);
+  }
+
+  @Override public boolean equals(Object obj) {
+    // TODO Auto-generated method stub
+    if (obj instanceof ByteArrayHolder) {
+      if (0 == ByteUtil.compare(mdKey, ((ByteArrayHolder) obj).mdKey)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override public int hashCode() {
+    int prime = 31;
+    int result = prime * Arrays.hashCode(mdKey);
+    result = result + prime * primaryKey;
+    return result;
+  }
+
+  public byte[] getMdKey() {
+    return mdKey;
+  }
+
+  public int getPrimaryKey() {
+    return primaryKey;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
new file mode 100644
index 0000000..24ea06e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * dictionary writer interface
+ */
+public interface CarbonDictionaryWriter extends Closeable {
+  /**
+   * write method that accepts one value at a time
+   * This method can be used when data is huge and memory is les. In that
+   * case data can be stored to a file and an iterator can iterate over it and
+   * pass one value at a time
+   *
+   * @param value unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  void write(String value) throws IOException;
+
+  /**
+   * write method that accepts one value at a time
+   * This method can be used when data is huge and memory is les. In that
+   * case data can be stored to a file and an iterator can iterate over it and
+   * pass one value at a time
+   *
+   * @param value unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  void write(byte[] value) throws IOException;
+
+  /**
+   * write method that accepts list of byte arrays as value
+   * This can be used when data is less, then string can be converted
+   * to byte array for each value and added to a list
+   *
+   * @param valueList list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  void write(List<byte[]> valueList) throws IOException;
+
+
+  void commit() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
new file mode 100644
index 0000000..2e08610
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.ColumnDictionaryChunk;
+import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
+
+import org.apache.thrift.TBase;
+
+/**
+ * This class is responsible for writing the dictionary file and its metadata
+ */
+public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDictionaryWriterImpl.class.getName());
+
+  /**
+   * carbon type identifier
+   */
+  protected CarbonTableIdentifier carbonTableIdentifier;
+
+  /**
+   * list which will hold values upto maximum of one dictionary chunk size
+   */
+  private List<ByteBuffer> oneDictionaryChunkList;
+
+  /**
+   * Meta object which will hold last segment entry details
+   */
+  private CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry;
+
+  /**
+   * dictionary file and meta thrift writer
+   */
+  private ThriftWriter dictionaryThriftWriter;
+
+  /**
+   * column identifier
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  /**
+   * HDFS store path
+   */
+  protected String hdfsStorePath;
+
+  /**
+   * dictionary file path
+   */
+  protected String dictionaryFilePath;
+
+  /**
+   * dictionary metadata file path
+   */
+  protected String dictionaryMetaFilePath;
+
+  /**
+   * start offset of dictionary chunk  for a segment
+   */
+  private long chunk_start_offset;
+
+  /**
+   * end offset of a dictionary chunk for a segment
+   */
+  private long chunk_end_offset;
+
+  /**
+   * total dictionary value record count for one segment
+   */
+  private int totalRecordCount;
+
+  /**
+   * total thrift object chunk count written for one segment
+   */
+  private int chunk_count;
+
+  /**
+   * chunk size for a dictionary file after which data will be written to disk
+   */
+  private int dictionary_one_chunk_size;
+
+  /**
+   * flag to check whether write method is called for first time
+   */
+  private boolean isFirstTime;
+
+  private static final Charset defaultCharset = Charset.forName(
+      CarbonCommonConstants.DEFAULT_CHARSET);
+
+  /**
+   * Constructor
+   *
+   * @param hdfsStorePath         HDFS store path
+   * @param carbonTableIdentifier table identifier which will give table name and database name
+   * @param columnIdentifier      column unique identifier
+   */
+  public CarbonDictionaryWriterImpl(String hdfsStorePath,
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.columnIdentifier = columnIdentifier;
+    this.hdfsStorePath = hdfsStorePath;
+    this.isFirstTime = true;
+  }
+
+  /**
+   * This method will write the data in thrift format to disk. This method will be guided by
+   * parameter dictionary_one_chunk_size and data will be divided into chunks
+   * based on this parameter
+   *
+   * @param value unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void write(String value) throws IOException {
+    write(value.getBytes(defaultCharset));
+  }
+
+  /**
+   * This method will write the data in thrift format to disk. This method will be guided by
+   * parameter dictionary_one_chunk_size and data will be divided into chunks
+   * based on this parameter
+   *
+   * @param value unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void write(byte[] value) throws IOException {
+    if (isFirstTime) {
+      init();
+      isFirstTime = false;
+    }
+    // if one chunk size is equal to list size then write the data to file
+    checkAndWriteDictionaryChunkToFile();
+    oneDictionaryChunkList.add(ByteBuffer.wrap(value));
+    totalRecordCount++;
+  }
+
+  /**
+   * This method will write the data in thrift format to disk. This method will not be guided by
+   * parameter dictionary_one_chunk_size and complete data will be written as one chunk
+   *
+   * @param valueList list of byte array. Each byte array is unique dictionary value
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void write(List<byte[]> valueList) throws IOException {
+    if (isFirstTime) {
+      init();
+      isFirstTime = false;
+    }
+    for (byte[] value : valueList) {
+      oneDictionaryChunkList.add(ByteBuffer.wrap(value));
+      totalRecordCount++;
+    }
+  }
+
+  /**
+   * write dictionary metadata file and close thrift object
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void close() throws IOException {
+    if (null != dictionaryThriftWriter) {
+      writeDictionaryFile();
+      // close the thrift writer for dictionary file
+      closeThriftWriter();
+    }
+  }
+
+  /**
+   * check if the threshold has been reached for the number of
+   * values that can kept in memory and then flush the data to file
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  private void checkAndWriteDictionaryChunkToFile() throws IOException {
+    if (oneDictionaryChunkList.size() >= dictionary_one_chunk_size) {
+      writeDictionaryFile();
+      createChunkList();
+    }
+  }
+
+  /**
+   * This method will serialize the object of dictionary file
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  private void writeDictionaryFile() throws IOException {
+    ColumnDictionaryChunk columnDictionaryChunk = new ColumnDictionaryChunk();
+    columnDictionaryChunk.setValues(oneDictionaryChunkList);
+    writeThriftObject(columnDictionaryChunk);
+  }
+
+  /**
+   * This method will check and created the directory path where dictionary file has to be created
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  private void init() throws IOException {
+    initDictionaryChunkSize();
+    initPaths();
+    boolean dictFileExists = CarbonUtil.isFileExists(this.dictionaryFilePath);
+    if (dictFileExists && CarbonUtil.isFileExists(this.dictionaryMetaFilePath)) {
+      this.chunk_start_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
+      validateDictionaryFileOffsetWithLastSegmentEntryOffset();
+    } else if (dictFileExists) {
+      FileFactory.getCarbonFile(dictionaryFilePath, FileFactory.getFileType(dictionaryFilePath))
+          .delete();
+    }
+    openThriftWriter(this.dictionaryFilePath);
+    createChunkList();
+  }
+
+  protected void initPaths() {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(columnIdentifier,
+            this.hdfsStorePath, carbonTableIdentifier);
+    this.dictionaryFilePath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+    this.dictionaryMetaFilePath =
+        carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
+  }
+
+  /**
+   * initialize the value of dictionary chunk that can be kept in memory at a time
+   */
+  private void initDictionaryChunkSize() {
+    try {
+      dictionary_one_chunk_size = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE,
+              CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT));
+    } catch (NumberFormatException e) {
+      dictionary_one_chunk_size =
+          Integer.parseInt(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT);
+      LOGGER.error("Dictionary chunk size not configured properly. Taking default size "
+              + dictionary_one_chunk_size);
+    }
+  }
+
+  /**
+   * initialise one dictionary size chunk list and increment chunk count
+   */
+  private void createChunkList() {
+    this.oneDictionaryChunkList = new ArrayList<ByteBuffer>(dictionary_one_chunk_size);
+    chunk_count++;
+  }
+
+  /**
+   * if file already exists then read metadata file and
+   * validate the last entry end offset with file size. If
+   * they are not equal that means some invalid data is present which needs
+   * to be truncated
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  private void validateDictionaryFileOffsetWithLastSegmentEntryOffset() throws IOException {
+    // read last dictionary chunk meta entry from dictionary metadata file
+    chunkMetaObjectForLastSegmentEntry = getChunkMetaObjectForLastSegmentEntry();
+    int bytesToTruncate =
+        (int) (chunk_start_offset - chunkMetaObjectForLastSegmentEntry.getEnd_offset());
+    if (bytesToTruncate > 0) {
+      LOGGER.info("some inconsistency in dictionary file for column " + this.columnIdentifier);
+      // truncate the dictionary data till chunk meta end offset
+      FileFactory.FileType fileType = FileFactory.getFileType(this.dictionaryFilePath);
+      CarbonFile carbonFile = FileFactory.getCarbonFile(this.dictionaryFilePath, fileType);
+      boolean truncateSuccess = carbonFile
+          .truncate(this.dictionaryFilePath, chunkMetaObjectForLastSegmentEntry.getEnd_offset());
+      if (!truncateSuccess) {
+        LOGGER.info("Diction file not truncated successfully for column " + this.columnIdentifier);
+      }
+    }
+  }
+
+  /**
+   * This method will write the dictionary metadata file for a given column
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  private void writeDictionaryMetadataFile() throws IOException {
+    // Format of dictionary metadata file
+    // min, max, start offset, end offset and chunk count
+    int min_surrogate_key = 0;
+    int max_surrogate_key = 0;
+    // case 1: first time dictionary writing
+    // previousMax = 0, totalRecordCount = 5, min = 1, max= 5
+    // case2: file already exists
+    // previousMax = 5, totalRecordCount = 10, min = 6, max = 15
+    // case 3: no unique values, total records 0
+    // previousMax = 15, totalRecordCount = 0, min = 15, max = 15
+    // both min and max equal to previous max
+    if (null != chunkMetaObjectForLastSegmentEntry) {
+      if (0 == totalRecordCount) {
+        min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key();
+      } else {
+        min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + 1;
+      }
+      max_surrogate_key =
+          chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + totalRecordCount;
+    } else {
+      if (totalRecordCount > 0) {
+        min_surrogate_key = 1;
+      }
+      max_surrogate_key = totalRecordCount;
+    }
+    ColumnDictionaryChunkMeta dictionaryChunkMeta =
+        new ColumnDictionaryChunkMeta(min_surrogate_key, max_surrogate_key, chunk_start_offset,
+            chunk_end_offset, chunk_count);
+    openThriftWriter(this.dictionaryMetaFilePath);
+    // write dictionary metadata file
+    writeThriftObject(dictionaryChunkMeta);
+    closeThriftWriter();
+    LOGGER.info("Dictionary metadata file written successfully for column " + this.columnIdentifier
+            + " at path " + this.dictionaryMetaFilePath);
+  }
+
+  /**
+   * open thrift writer for writing dictionary chunk/meta object
+   *
+   * @param dictionaryFile can be dictionary file name or dictionary metadata file name
+   * @throws IOException if an I/O error occurs
+   */
+  private void openThriftWriter(String dictionaryFile) throws IOException {
+    // create thrift writer instance
+    dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
+    // open the file stream
+    dictionaryThriftWriter.open();
+  }
+
+  /**
+   * This method will write the thrift object to a file
+   *
+   * @param dictionaryThriftObject can be dictionary thrift object or dictionary metadata
+   *                               thrift object
+   * @throws IOException if an I/O error occurs
+   */
+  private void writeThriftObject(TBase dictionaryThriftObject) throws IOException {
+    dictionaryThriftWriter.write(dictionaryThriftObject);
+  }
+
+  /**
+   * close dictionary thrift writer
+   */
+  private void closeThriftWriter() {
+    if (null != dictionaryThriftWriter) {
+      dictionaryThriftWriter.close();
+    }
+  }
+
+  /**
+   * This method will read the dictionary chunk metadata thrift object for last entry
+   *
+   * @return last entry of dictionary meta chunk
+   * @throws IOException if an I/O error occurs
+   */
+  private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
+      throws IOException {
+    CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk = null;
+    CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
+    try {
+      // read the last segment entry for dictionary metadata
+      carbonDictionaryColumnMetaChunk =
+          columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
+    } finally {
+      // Close metadata reader
+      columnMetadataReaderImpl.close();
+    }
+    return carbonDictionaryColumnMetaChunk;
+  }
+
+  /**
+   * @return
+   */
+  protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
+    return new CarbonDictionaryMetadataReaderImpl(hdfsStorePath, carbonTableIdentifier,
+        columnIdentifier);
+  }
+
+  @Override public void commit() throws IOException {
+    if (null != dictionaryThriftWriter) {
+      this.chunk_end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
+      writeDictionaryMetadataFile();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
new file mode 100644
index 0000000..04d2b97
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.carbondata.format.FileFooter;
+
+/**
+ * Writes metadata block to the fact table file in thrift
+ * format org.apache.carbondata.format.FileFooter
+ */
+public class CarbonFooterWriter {
+
+  // It is version number of this format class.
+  private static int VERSION_NUMBER = 1;
+
+  // Fact file path
+  private String filePath;
+
+  public CarbonFooterWriter(String filePath) {
+    this.filePath = filePath;
+  }
+
+  /**
+   * It writes FileFooter thrift format object to file.
+   *
+   * @param footer
+   * @param currentPosition At where this metadata is going to be written.
+   * @throws IOException
+   */
+  public void writeFooter(FileFooter footer, long currentPosition) throws IOException {
+
+    ThriftWriter thriftWriter = openThriftWriter(filePath);
+    footer.setVersion(VERSION_NUMBER);
+    try {
+      thriftWriter.write(footer);
+      thriftWriter.writeOffset(currentPosition);
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      thriftWriter.close();
+    }
+  }
+
+  /**
+   * open thrift writer for writing dictionary chunk/meta object
+   */
+  private ThriftWriter openThriftWriter(String filePath) throws IOException {
+    // create thrift writer instance
+    ThriftWriter thriftWriter = new ThriftWriter(filePath, true);
+    // open the file stream
+    thriftWriter.open();
+    return thriftWriter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
new file mode 100644
index 0000000..bf6fc3b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileWriter {
+
+  /**
+   * thrift writer object
+   */
+  private ThriftWriter thriftWriter;
+
+  /**
+   * It writes thrift object to file
+   *
+   * @param footer
+   * @throws IOException
+   */
+  public void writeThrift(TBase indexObject) throws IOException {
+    thriftWriter.write(indexObject);
+  }
+
+  /**
+   * Below method will be used to open the thrift writer
+   *
+   * @param filePath file path where data need to be written
+   * @throws IOException throws io exception in case of any failure
+   */
+  public void openThriftWriter(String filePath) throws IOException {
+    // create thrift writer instance
+    thriftWriter = new ThriftWriter(filePath, true);
+    // open the file stream
+    thriftWriter.open();
+  }
+
+  /**
+   * Below method will be used to close the thrift object
+   */
+  public void close() {
+    thriftWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java b/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
new file mode 100644
index 0000000..61a89f9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/HierarchyValueWriterForCSV.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.pentaho.di.core.exception.KettleException;
+
+public class HierarchyValueWriterForCSV {
+
+  /**
+   * Comment for <code>LOGGER</code>
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(HierarchyValueWriterForCSV.class.getName());
+  /**
+   * hierarchyName
+   */
+  private String hierarchyName;
+
+  /**
+   * bufferedOutStream
+   */
+  private FileChannel outPutFileChannel;
+
+  /**
+   * storeFolderLocation
+   */
+  private String storeFolderLocation;
+
+  /**
+   * intialized
+   */
+  private boolean intialized;
+
+  /**
+   * counter the number of files.
+   */
+  private int counter;
+
+  /**
+   * byteArrayList
+   */
+  private List<ByteArrayHolder> byteArrayholder =
+      new ArrayList<ByteArrayHolder>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+  /**
+   * toflush
+   */
+  private int toflush;
+
+  public HierarchyValueWriterForCSV(String hierarchy, String storeFolderLocation) {
+    this.hierarchyName = hierarchy;
+    this.storeFolderLocation = storeFolderLocation;
+
+    CarbonProperties instance = CarbonProperties.getInstance();
+
+    this.toflush = Integer.parseInt(instance
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+
+    int rowSetSize = Integer.parseInt(instance.getProperty(CarbonCommonConstants.GRAPH_ROWSET_SIZE,
+        CarbonCommonConstants.GRAPH_ROWSET_SIZE_DEFAULT));
+
+    if (this.toflush > rowSetSize) {
+      this.toflush = rowSetSize;
+    }
+
+    updateCounter(hierarchy, storeFolderLocation);
+  }
+
+  /**
+   * @return Returns the byteArrayList.
+   */
+  public List<ByteArrayHolder> getByteArrayList() throws KettleException {
+    return byteArrayholder;
+  }
+
+  public FileChannel getBufferedOutStream() {
+    return outPutFileChannel;
+  }
+
+  private void updateCounter(final String meString, String storeFolderLocation) {
+    File storeFolder = new File(storeFolderLocation);
+
+    File[] listFiles = storeFolder.listFiles(new FileFilter() {
+
+      @Override public boolean accept(File file) {
+        if (file.getName().indexOf(meString) > -1)
+
+        {
+          return true;
+        }
+        return false;
+      }
+    });
+
+    if (null == listFiles || listFiles.length == 0) {
+      counter = 0;
+      return;
+    }
+
+    for (File hierFile : listFiles) {
+      String hierFileName = hierFile.getName();
+
+      if (hierFileName.endsWith(CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+        hierFileName = hierFileName.substring(0, hierFileName.lastIndexOf('.'));
+        try {
+          counter = Integer.parseInt(hierFileName.substring(hierFileName.length() - 1));
+        } catch (NumberFormatException nfe) {
+
+          if (new File(hierFileName + '0' + CarbonCommonConstants.LEVEL_FILE_EXTENSION).exists()) {
+            // Need to skip because the case can come in which server went down while files were
+            // merging and the other hierarchy files were not deleted, and the current file
+            // status is inrogress. so again we will merge the files and rename to normal file
+            LOGGER.info("Need to skip as this can be case in which hierarchy file already renamed");
+            if (hierFile.delete()) {
+              LOGGER.info("Deleted the Inprogress hierarchy Files.");
+            }
+          } else {
+            // levelfileName0.level file not exist that means files is merged and other
+            // files got deleted. while renaming this file from inprogress to normal file,
+            // server got restarted/killed. so we need to rename the file to normal.
+
+            File inprogressFile = new File(storeFolder + File.separator + hierFile.getName());
+            File changetoName = new File(storeFolder + File.separator + hierFileName);
+
+            if (inprogressFile.renameTo(changetoName)) {
+              LOGGER.info(
+                  "Renaming the level Files while creating the new instance on server startup.");
+            }
+
+          }
+
+        }
+      }
+
+      String val = hierFileName.substring(hierFileName.length() - 1);
+
+      int parsedVal = getIntValue(val);
+
+      if (counter < parsedVal) {
+        counter = parsedVal;
+      }
+    }
+    counter++;
+  }
+
+  private int getIntValue(String val) {
+    int parsedVal = 0;
+    try {
+      parsedVal = Integer.parseInt(val);
+    } catch (NumberFormatException nfe) {
+      LOGGER.info("Hierarchy File is already renamed so there will not be"
+              + "any need to keep the counter");
+    }
+    return parsedVal;
+  }
+
+  private void intialize() throws KettleException {
+    intialized = true;
+
+    File f = new File(storeFolderLocation + File.separator + hierarchyName + counter
+        + CarbonCommonConstants.FILE_INPROGRESS_STATUS);
+
+    counter++;
+
+    FileOutputStream fos = null;
+
+    boolean isFileCreated = false;
+    if (!f.exists()) {
+      try {
+        isFileCreated = f.createNewFile();
+
+      } catch (IOException e) {
+        //not required: findbugs fix
+        throw new KettleException("unable to create member mapping file", e);
+      }
+      if (!isFileCreated) {
+        throw new KettleException("unable to create file" + f.getAbsolutePath());
+      }
+    }
+
+    try {
+      fos = new FileOutputStream(f);
+
+      outPutFileChannel = fos.getChannel();
+    } catch (FileNotFoundException e) {
+      closeStreamAndDeleteFile(f, outPutFileChannel, fos);
+      throw new KettleException("member Mapping File not found to write mapping info", e);
+    }
+  }
+
+  public void writeIntoHierarchyFile(byte[] bytes, int primaryKey) throws KettleException {
+    if (!intialized) {
+      intialize();
+    }
+
+    ByteBuffer byteBuffer = storeValueInCache(bytes, primaryKey);
+
+    try {
+      byteBuffer.flip();
+      outPutFileChannel.write(byteBuffer);
+    } catch (IOException e) {
+      throw new KettleException("Error while writting in the hierarchy mapping file", e);
+    }
+  }
+
+  private ByteBuffer storeValueInCache(byte[] bytes, int primaryKey) {
+
+    // adding 4 to store the total length of the row at the beginning
+    ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 4);
+
+    buffer.put(bytes);
+    buffer.putInt(primaryKey);
+
+    return buffer;
+  }
+
+  public void performRequiredOperation() throws KettleException {
+    if (byteArrayholder.size() == 0) {
+      return;
+    }
+    //write to the file and close the stream.
+    Collections.sort(byteArrayholder);
+
+    for (ByteArrayHolder byteArray : byteArrayholder) {
+      writeIntoHierarchyFile(byteArray.getMdKey(), byteArray.getPrimaryKey());
+    }
+
+    CarbonUtil.closeStreams(outPutFileChannel);
+
+    //rename the inprogress file to normal .level file
+    String filePath = this.storeFolderLocation + File.separator + hierarchyName + (counter - 1)
+        + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    File inProgressFile = new File(filePath);
+    String inprogressFileName = inProgressFile.getName();
+
+    String changedFileName = inprogressFileName.substring(0, inprogressFileName.lastIndexOf('.'));
+
+    File orgFinalName = new File(this.storeFolderLocation + File.separator + changedFileName);
+
+    if (!inProgressFile.renameTo(orgFinalName)) {
+      LOGGER.error("Not able to rename file : " + inprogressFileName);
+    }
+
+    //create the new outputStream
+    try {
+      intialize();
+    } catch (KettleException e) {
+      LOGGER.error("Not able to create output stream for file:" + hierarchyName + (counter - 1));
+    }
+
+    //clear the byte array holder also.
+    byteArrayholder.clear();
+  }
+
+  private void closeStreamAndDeleteFile(File f, Closeable... streams) throws KettleException {
+    boolean isDeleted = false;
+    for (Closeable stream : streams) {
+      if (null != stream) {
+        try {
+          stream.close();
+        } catch (IOException e) {
+          LOGGER.error(e, "unable to close the stream ");
+        }
+
+      }
+    }
+
+    // delete the file
+    isDeleted = f.delete();
+    if (!isDeleted) {
+      LOGGER.error("Unable to delete the file " + f.getAbsolutePath());
+    }
+
+  }
+
+  public String getHierarchyName() {
+    return hierarchyName;
+  }
+
+  public int getCounter() {
+    return counter;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
new file mode 100644
index 0000000..c232fb2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+/**
+ * Simple class that makes it easy to write Thrift objects to disk.
+ */
+public class ThriftWriter {
+
+  /**
+   * buffer size
+   */
+  private static final int bufferSize = 2048;
+
+  /**
+   * File to write to.
+   */
+  private String fileName;
+
+  /**
+   * For writing to the file.
+   */
+  private DataOutputStream dataOutputStream;
+
+  /**
+   * For binary serialization of objects.
+   */
+  private TProtocol binaryOut;
+
+  /**
+   * flag to append to existing file
+   */
+  private boolean append;
+
+  /**
+   * Constructor.
+   */
+  public ThriftWriter(String fileName, boolean append) {
+    this.fileName = fileName;
+    this.append = append;
+  }
+
+  /**
+   * Open the file for writing.
+   */
+  public void open() throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+    dataOutputStream = FileFactory.getDataOutputStream(fileName, fileType, bufferSize, append);
+    binaryOut = new TCompactProtocol(new TIOStreamTransport(dataOutputStream));
+  }
+
+  /**
+   * Write the object to disk.
+   */
+  public void write(TBase t) throws IOException {
+    try {
+      t.write(binaryOut);
+      dataOutputStream.flush();
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Write the offset to the file
+   *
+   * @param offset
+   * @throws IOException
+   */
+  public void writeOffset(long offset) throws IOException {
+    dataOutputStream.writeLong(offset);
+  }
+
+  /**
+   * Close the file stream.
+   */
+  public void close() {
+    CarbonUtil.closeStreams(dataOutputStream);
+  }
+
+  /**
+   * Flush data to HDFS file
+   */
+  public void sync() throws IOException {
+    if (dataOutputStream instanceof FSDataOutputStream) {
+      ((FSDataOutputStream) dataOutputStream).hsync();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java b/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
new file mode 100644
index 0000000..2d978fb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/exception/CarbonDataWriterException.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer.exception;
+
+import java.util.Locale;
+
+public class CarbonDataWriterException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataWriterException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataWriterException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
new file mode 100644
index 0000000..e9d7b1d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for writing the dictionary sort index and sort index revers data.
+ */
+public interface CarbonDictionarySortIndexWriter extends Closeable {
+
+  /**
+   * The method is used write the dictionary sortIndex data to columns
+   * sortedIndex file in thrif format.
+   *
+   * @param sortIndexList list of sortIndex
+   * @throws IOException In Case of any I/O errors occurs.
+   */
+  public void writeSortIndex(List<Integer> sortIndexList) throws IOException;
+
+  /**
+   * The method is used write the dictionary sortIndexInverted data to columns
+   * sortedIndex file in thrif format.
+   *
+   * @param invertedSortIndexList list of  sortIndexInverted
+   * @throws IOException In Case of any I/O errors occurs.
+   */
+  public void writeInvertedSortIndex(List<Integer> invertedSortIndexList) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
new file mode 100644
index 0000000..b6df97d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer.sortindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.format.ColumnSortInfo;
+
+/**
+ * The class responsible for writing the dictionary/column sort index and sort index inverted data
+ * in the thrift format
+ */
+public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySortIndexWriter {
+
+  /**
+   * carbonTable Identifier holding the info of databaseName and tableName
+   */
+  protected CarbonTableIdentifier carbonTableIdentifier;
+
+  /**
+   * column name
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  /**
+   * carbon store location
+   */
+  protected String carbonStorePath;
+  /**
+   * Path of dictionary sort index file for which the sortIndex to be written
+   */
+  protected String sortIndexFilePath;
+  /**
+   * Instance of thrift writer to write the data
+   */
+  private ThriftWriter sortIndexThriftWriter;
+
+  /**
+   * Column sort info thrift instance.
+   */
+  private ColumnSortInfo columnSortInfo = new ColumnSortInfo();
+
+  /**
+   * Comment for <code>LOGGER</code>
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDictionarySortIndexWriterImpl.class.getName());
+
+  /**
+   * @param carbonStorePath       Carbon store path
+   * @param carbonTableIdentifier table identifier which will give table name and database name
+   * @param columnIdentifier      column unique identifier
+   */
+  public CarbonDictionarySortIndexWriterImpl(final CarbonTableIdentifier carbonTableIdentifier,
+      final ColumnIdentifier columnIdentifier, final String carbonStorePath) {
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.columnIdentifier = columnIdentifier;
+    this.carbonStorePath = carbonStorePath;
+  }
+
+  /**
+   * The method is used populate the dictionary sortIndex data to columnSortInfo
+   * in thrif format.
+   *
+   * @param sortIndexList list of sortIndex
+   * @throws IOException In Case of any I/O errors occurs.
+   */
+  @Override public void writeSortIndex(List<Integer> sortIndexList) throws IOException {
+    columnSortInfo.setSort_index(sortIndexList);
+  }
+
+  /**
+   * The method is used populate the dictionary Inverted sortIndex data to columnSortInfo
+   * in thrif format.
+   *
+   * @param invertedSortIndexList list of  sortIndexInverted
+   * @throws IOException In Case of any I/O errors occurs.
+   */
+  @Override public void writeInvertedSortIndex(List<Integer> invertedSortIndexList)
+      throws IOException {
+    columnSortInfo.setSort_index_inverted(invertedSortIndexList);
+  }
+
+  /**
+   * Initialize the sortIndexFilePath and open writing stream
+   * for dictionary sortIndex file thrif writer
+   * write the column sort info to the store when both sort index  and sort index
+   * inverted are populated.
+   * existing sort index file has to be overwritten with new sort index data
+   * columnSortInfo having null sortIndex and invertedSortIndex will not be written
+   */
+  private void writeColumnSortInfo() throws IOException {
+    boolean isNotNull =
+        null != columnSortInfo.getSort_index() && null != columnSortInfo.sort_index_inverted;
+    if (isNotNull) {
+      initPath();
+      String folderContainingFile = CarbonTablePath.getFolderContainingFile(this.sortIndexFilePath);
+      boolean created = CarbonUtil.checkAndCreateFolder(folderContainingFile);
+      if (!created) {
+        LOGGER.error("Database metadata folder creation status :: " + created);
+        throw new IOException("Failed to created database metadata folder");
+      }
+      try {
+
+        this.sortIndexThriftWriter = new ThriftWriter(this.sortIndexFilePath, false);
+        this.sortIndexThriftWriter.open();
+        sortIndexThriftWriter.write(columnSortInfo);
+      } catch (IOException ie) {
+        LOGGER.error(ie,
+            "problem while writing the dictionary sort index file.");
+        throw new IOException("problem while writing the dictionary sort index file.", ie);
+      } finally {
+        if (null != sortIndexThriftWriter) {
+          this.sortIndexThriftWriter.close();
+        }
+        this.sortIndexFilePath = null;
+      }
+    }
+  }
+
+  protected void initPath() {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath = pathService
+        .getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
+    String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+    long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
+    this.sortIndexFilePath =
+        carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+    cleanUpOldSortIndex(carbonTablePath, dictionaryPath);
+  }
+
+  /**
+   * It cleans up old unused sortindex file
+   *
+   * @param carbonTablePath
+   */
+  protected void cleanUpOldSortIndex(CarbonTablePath carbonTablePath, String dictPath) {
+    CarbonFile dictFile =
+        FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath));
+    CarbonFile[] files =
+        carbonTablePath.getSortIndexFiles(dictFile.getParentFile(),
+            columnIdentifier.getColumnId());
+    int maxTime;
+    try {
+      maxTime = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+    } catch (NumberFormatException e) {
+      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+    }
+    if (null != files) {
+      Arrays.sort(files, new Comparator<CarbonFile>() {
+        @Override public int compare(CarbonFile o1, CarbonFile o2) {
+          return o1.getName().compareTo(o2.getName());
+        }
+      });
+      for (int i = 0; i < files.length - 1; i++) {
+        long difference = System.currentTimeMillis() - files[i].getLastModifiedTime();
+        long minutesElapsed = (difference / (1000 * 60));
+        if (minutesElapsed > maxTime) {
+          if (!files[i].delete()) {
+            LOGGER.warn("Failed to delete sortindex file." + files[i].getAbsolutePath());
+          } else {
+            LOGGER.info("Sort index file is deleted." + files[i].getAbsolutePath());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void close() throws IOException {
+    writeColumnSortInfo();
+    if (null != sortIndexThriftWriter) {
+      sortIndexThriftWriter.close();
+    }
+  }
+}