You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:08:55 UTC

[10/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/LoadStatistics.java b/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
deleted file mode 100644
index e5f24e6..0000000
--- a/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.util;
-
-public interface LoadStatistics {
-  //Init PartitonInfo
-  void  initPartitonInfo(String PartitionId);
-
-  //Record the time
-  void recordDicShuffleAndWriteTime();
-
-  void recordLoadCsvfilesToDfTime();
-
-  void recordDictionaryValuesTotalTime(String partitionID,
-      Long dictionaryValuesTotalTimeTimePoint);
-
-  void recordCsvInputStepTime(String partitionID,
-      Long csvInputStepTimePoint);
-
-  void recordLruCacheLoadTime(double lruCacheLoadTime);
-
-  void recordGeneratingDictionaryValuesTime(String partitionID,
-      Long generatingDictionaryValuesTimePoint);
-
-  void recordSortRowsStepTotalTime(String partitionID,
-      Long sortRowsStepTotalTimePoint);
-
-  void recordMdkGenerateTotalTime(String partitionID,
-      Long mdkGenerateTotalTimePoint);
-
-  void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
-      Long dictionaryValue2MdkAdd2FileTimePoint);
-
-  //Record the node blocks information map
-  void recordHostBlockMap(String host, Integer numBlocks);
-
-  //Record the partition blocks information map
-  void recordPartitionBlockMap(String partitionID, Integer numBlocks);
-
-  //Record total num of records processed
-  void recordTotalRecords(long totalRecords);
-
-  //Print the statistics information
-  void printStatisticsInfo(String partitionID);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java
deleted file mode 100644
index ac2281d..0000000
--- a/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java
+++ /dev/null
@@ -1,1027 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.util;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.datastorage.store.compression.MeasureMetaDataModel;
-import org.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressByteArray;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressDefaultLong;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinByte;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinDefault;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinFloat;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinInt;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinLong;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinShort;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalByte;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalDefault;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalFloat;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalInt;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalLong;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinByte;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinDefault;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinFloat;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinInt;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinLong;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalMaxMinShort;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNonDecimalShort;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneByte;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneDefault;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneFloat;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneInt;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneLong;
-import org.carbondata.core.datastorage.store.compression.type.UnCompressNoneShort;
-
-public final class ValueCompressionUtil {
-
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ValueCompressionUtil.class.getName());
-
-  private ValueCompressionUtil() {
-
-  }
-
-  /**
-   * decide actual type of value
-   *
-   * @param value   :the measure value
-   * @param decimal :
-   * @return: actual type of value
-   * @see
-   */
-  private static DataType getDataType(double value, int decimal, byte dataTypeSelected) {
-    DataType dataType = DataType.DATA_DOUBLE;
-    if (decimal == 0) {
-      if (value < Byte.MAX_VALUE) {
-        dataType = DataType.DATA_BYTE;
-      } else if (value < Short.MAX_VALUE) {
-        dataType = DataType.DATA_SHORT;
-      } else if (value < Integer.MAX_VALUE) {
-        dataType = DataType.DATA_INT;
-      } else if (value < Long.MAX_VALUE) {
-        dataType = DataType.DATA_LONG;
-      }
-    } else {
-      if (dataTypeSelected == 1) {
-        if (value < Float.MAX_VALUE) {
-          float floatValue = (float) value;
-          if (floatValue - value != 0) {
-            dataType = DataType.DATA_DOUBLE;
-
-          } else {
-            dataType = DataType.DATA_FLOAT;
-          }
-        } else if (value < Double.MAX_VALUE) {
-          dataType = DataType.DATA_DOUBLE;
-        }
-      }
-    }
-    return dataType;
-  }
-
-  /**
-   * Gives the size of datatype
-   *
-   * @param dataType : measure value type
-   * @return: the size of DataType
-   * @see
-   */
-  public static int getSize(DataType dataType) {
-
-    switch (dataType) {
-      case DATA_BYTE:
-        return 1;
-      case DATA_SHORT:
-        return 2;
-      case DATA_INT:
-      case DATA_FLOAT:
-        return 4;
-      default:
-        return 8;
-    }
-  }
-
-  /**
-   * get the best compression type. priority list,from high to low:
-   * COMPRESSION_TYPE.NONE COMPRESSION_TYPE.MAX_MIN
-   * COMPRESSION_TYPE.NON_DECIMAL_CONVERT COMPRESSION_TYPE.MAX_MIN_NDC
-   *
-   * @param maxValue : max value of one measure
-   * @param minValue : min value of one measure
-   * @param decimal  : decimal num of one measure
-   * @return : the best compression type
-   * @see
-   */
-  private static CompressionFinder getCompressionType(Object maxValue, Object minValue, int decimal,
-      char aggregatorType, byte dataTypeSelected) {
-    // 'c' for aggregate table,'b' fo rBigdecimal, 'l' for long,'n' for double
-    switch (aggregatorType) {
-      case 'c':
-        return new CompressionFinder(COMPRESSION_TYPE.CUSTOM, DataType.DATA_BYTE,
-            DataType.DATA_BYTE);
-      case 'b':
-        return new CompressionFinder(COMPRESSION_TYPE.CUSTOM_BIGDECIMAL, DataType.DATA_BYTE,
-            DataType.DATA_BYTE);
-      case 'l':
-        return new CompressionFinder(COMPRESSION_TYPE.NONE,
-                DataType.DATA_BIGINT, DataType.DATA_BIGINT);
-      default:
-        break;
-    }
-    // None Decimal
-    if (decimal == 0) {
-      if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) > getSize(
-          getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
-        return new CompressionFinder(COMPRESSION_TYPE.MAX_MIN, DataType.DATA_DOUBLE,
-            getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
-      } else if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) < getSize(
-              getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) {
-        return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
-                getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected));
-      } else {
-        return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE,
-            getDataType((double) maxValue, decimal, dataTypeSelected));
-      }
-    }
-    // decimal
-    else {
-      DataType actualDataType = getDataType((double) maxValue, decimal, dataTypeSelected);
-      DataType diffDataType =
-          getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected);
-      DataType maxNonDecDataType =
-          getDataType(Math.pow(10, decimal) * (double) maxValue, 0, dataTypeSelected);
-      DataType diffNonDecDataType =
-          getDataType(Math.pow(10, decimal) * ((double) maxValue - (double) minValue), 0,
-              dataTypeSelected);
-
-      CompressionFinder[] finders = new CompressionFinder[] {
-          new CompressionFinder(actualDataType, actualDataType, CompressionFinder.PRIORITY.ACTUAL,
-              COMPRESSION_TYPE.NONE),
-          new CompressionFinder(actualDataType, diffDataType, CompressionFinder.PRIORITY.DIFFSIZE,
-              COMPRESSION_TYPE.MAX_MIN), new CompressionFinder(actualDataType, maxNonDecDataType,
-          CompressionFinder.PRIORITY.MAXNONDECIMAL, COMPRESSION_TYPE.NON_DECIMAL_CONVERT),
-          new CompressionFinder(actualDataType, diffNonDecDataType,
-              CompressionFinder.PRIORITY.DIFFNONDECIMAL, COMPRESSION_TYPE.MAX_MIN_NDC) };
-      // sort the compressionFinder.The top have the highest priority
-      Arrays.sort(finders);
-      CompressionFinder compression = finders[0];
-      return compression;
-    }
-  }
-
-  /**
-   * @param compType        : compression type
-   * @param values          : the data of one measure
-   * @param changedDataType : changed data type
-   * @param maxValue        : the max value of one measure
-   * @param decimal         : the decimal length of one measure
-   * @return: the compress data array
-   * @see
-   */
-  public static Object getCompressedValues(COMPRESSION_TYPE compType, double[] values,
-      DataType changedDataType, double maxValue, int decimal) {
-    Object o;
-    switch (compType) {
-      case NONE:
-
-        o = compressNone(changedDataType, values);
-        return o;
-
-      case MAX_MIN:
-
-        o = compressMaxMin(changedDataType, values, maxValue);
-        return o;
-
-      case NON_DECIMAL_CONVERT:
-
-        o = compressNonDecimal(changedDataType, values, decimal);
-        return o;
-
-      default:
-        o = compressNonDecimalMaxMin(changedDataType, values, decimal, maxValue);
-        return o;
-    }
-  }
-
-  public static Object getCompressedValues(COMPRESSION_TYPE compType, long[] values,
-      DataType changedDataType, long maxValue, int decimal) {
-    Object o;
-    switch (compType) {
-      case NONE:
-      default:
-        return values;
-    }
-  }
-
-  private static ValueCompressonHolder.UnCompressValue[] getUncompressedValues(
-      COMPRESSION_TYPE[] compType, DataType[] actualDataType, DataType[] changedDataType) {
-
-    ValueCompressonHolder.UnCompressValue[] compressValue =
-        new ValueCompressonHolder.UnCompressValue[changedDataType.length];
-    for (int i = 0; i < changedDataType.length; i++) {
-      switch (compType[i]) {
-        case NONE:
-
-          compressValue[i] = unCompressNone(changedDataType[i], actualDataType[i]);
-          break;
-
-        case MAX_MIN:
-
-          compressValue[i] = unCompressMaxMin(changedDataType[i], actualDataType[i]);
-          break;
-
-        case NON_DECIMAL_CONVERT:
-
-          compressValue[i] = unCompressNonDecimal(changedDataType[i], DataType.DATA_DOUBLE);
-          break;
-
-        case CUSTOM:
-          compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BYTE_ARRAY);
-          break;
-
-        case CUSTOM_BIGDECIMAL:
-          compressValue[i] = new UnCompressByteArray(UnCompressByteArray.ByteArrayType.BIG_DECIMAL);
-          break;
-
-        default:
-          compressValue[i] = unCompressNonDecimalMaxMin(changedDataType[i], null);
-      }
-    }
-    return compressValue;
-
-  }
-
-  /**
-   * compress data to other type for example: double -> int
-   */
-  private static Object compressNone(DataType changedDataType, double[] value) {
-    int i = 0;
-    switch (changedDataType) {
-
-      case DATA_BYTE:
-
-        byte[] result = new byte[value.length];
-
-        for (double a : value) {
-          result[i] = (byte) a;
-          i++;
-        }
-        return result;
-
-      case DATA_SHORT:
-
-        short[] shortResult = new short[value.length];
-
-        for (double a : value) {
-          shortResult[i] = (short) a;
-          i++;
-        }
-        return shortResult;
-
-      case DATA_INT:
-
-        int[] intResult = new int[value.length];
-
-        for (double a : value) {
-          intResult[i] = (int) a;
-          i++;
-        }
-        return intResult;
-
-      case DATA_LONG:
-      case DATA_BIGINT:
-
-        long[] longResult = new long[value.length];
-
-        for (double a : value) {
-          longResult[i] = (long) a;
-          i++;
-        }
-        return longResult;
-
-      case DATA_FLOAT:
-
-        float[] floatResult = new float[value.length];
-
-        for (double a : value) {
-          floatResult[i] = (float) a;
-          i++;
-        }
-        return floatResult;
-
-      default:
-
-        return value;
-
-    }
-  }
-
-  /**
-   * compress data to other type through sub value for example: 1. subValue =
-   * maxValue - value 2. subValue: double->int
-   */
-  private static Object compressMaxMin(DataType changedDataType, double[] value, double maxValue) {
-    int i = 0;
-    switch (changedDataType) {
-      case DATA_BYTE:
-
-        byte[] result = new byte[value.length];
-        for (double a : value) {
-          result[i] = (byte) (maxValue - a);
-          i++;
-        }
-        return result;
-
-      case DATA_SHORT:
-
-        short[] shortResult = new short[value.length];
-
-        for (double a : value) {
-          shortResult[i] = (short) (maxValue - a);
-          i++;
-        }
-        return shortResult;
-
-      case DATA_INT:
-
-        int[] intResult = new int[value.length];
-
-        for (double a : value) {
-          intResult[i] = (int) (maxValue - a);
-          i++;
-        }
-        return intResult;
-
-      case DATA_LONG:
-
-        long[] longResult = new long[value.length];
-
-        for (double a : value) {
-          longResult[i] = (long) (maxValue - a);
-          i++;
-        }
-        return longResult;
-
-      case DATA_FLOAT:
-
-        float[] floatResult = new float[value.length];
-
-        for (double a : value) {
-          floatResult[i] = (float) (maxValue - a);
-          i++;
-        }
-        return floatResult;
-
-      default:
-
-        double[] defaultResult = new double[value.length];
-
-        for (double a : value) {
-          defaultResult[i] = (double) (maxValue - a);
-          i++;
-        }
-        return defaultResult;
-
-    }
-  }
-
-  /**
-   * compress data to other type through sub value for example: 1. subValue =
-   * value * Math.pow(10, decimal) 2. subValue: double->int
-   */
-  private static Object compressNonDecimal(DataType changedDataType, double[] value, int decimal) {
-    int i = 0;
-    switch (changedDataType) {
-      case DATA_BYTE:
-        byte[] result = new byte[value.length];
-
-        for (double a : value) {
-          result[i] = (byte) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return result;
-      case DATA_SHORT:
-        short[] shortResult = new short[value.length];
-
-        for (double a : value) {
-          shortResult[i] = (short) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return shortResult;
-      case DATA_INT:
-
-        int[] intResult = new int[value.length];
-
-        for (double a : value) {
-          intResult[i] = (int) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return intResult;
-
-      case DATA_LONG:
-
-        long[] longResult = new long[value.length];
-
-        for (double a : value) {
-          longResult[i] = (long) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return longResult;
-
-      case DATA_FLOAT:
-
-        float[] floatResult = new float[value.length];
-
-        for (double a : value) {
-          floatResult[i] = (float) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return floatResult;
-
-      default:
-        double[] defaultResult = new double[value.length];
-
-        for (double a : value) {
-          defaultResult[i] = (double) (Math.round(Math.pow(10, decimal) * a));
-          i++;
-        }
-        return defaultResult;
-    }
-  }
-
-  /**
-   * compress data to other type through sub value for example: 1. subValue =
-   * maxValue - value 2. subValue = subValue * Math.pow(10, decimal) 3.
-   * subValue: double->int
-   */
-  private static Object compressNonDecimalMaxMin(DataType changedDataType, double[] value,
-      int decimal, double maxValue) {
-    int i = 0;
-    switch (changedDataType) {
-      case DATA_BYTE:
-
-        byte[] result = new byte[value.length];
-
-        for (double a : value) {
-          result[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return result;
-
-      case DATA_SHORT:
-
-        short[] shortResult = new short[value.length];
-
-        for (double a : value) {
-          shortResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return shortResult;
-
-      case DATA_INT:
-
-        int[] intResult = new int[value.length];
-
-        for (double a : value) {
-          intResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return intResult;
-
-      case DATA_LONG:
-
-        long[] longResult = new long[value.length];
-
-        for (double a : value) {
-          longResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return longResult;
-
-      case DATA_FLOAT:
-
-        float[] floatResult = new float[value.length];
-
-        for (double a : value) {
-          floatResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return floatResult;
-
-      default:
-
-        double[] defaultResult = new double[value.length];
-
-        for (double a : value) {
-          defaultResult[i] = (byte) (Math.round((maxValue - a) * Math.pow(10, decimal)));
-          i++;
-        }
-        return defaultResult;
-
-    }
-  }
-
-  /**
-   * uncompress data for example: int -> double
-   */
-  public static ValueCompressonHolder.UnCompressValue unCompressNone(DataType compDataType,
-      DataType actualDataType) {
-    if (actualDataType == DataType.DATA_BIGINT) {
-      return new UnCompressDefaultLong();
-    } else {
-      switch (compDataType) {
-        case DATA_BYTE:
-
-          return new UnCompressNoneByte();
-
-        case DATA_SHORT:
-
-          return new UnCompressNoneShort();
-
-        case DATA_INT:
-
-          return new UnCompressNoneInt();
-
-        case DATA_LONG:
-
-          return new UnCompressNoneLong();
-
-        case DATA_FLOAT:
-
-          return new UnCompressNoneFloat();
-
-        default:
-
-          return new UnCompressNoneDefault();
-      }
-    }
-  }
-
-  /**
-   * uncompress data 1. value = maxValue - subValue 2. value: int->double
-   */
-  public static ValueCompressonHolder.UnCompressValue unCompressMaxMin(DataType compDataType,
-      DataType actualDataType) {
-    switch (compDataType) {
-      case DATA_BYTE:
-
-        return new UnCompressMaxMinByte();
-
-      case DATA_SHORT:
-
-        return new UnCompressMaxMinShort();
-
-      case DATA_INT:
-
-        return new UnCompressMaxMinInt();
-
-      case DATA_LONG:
-
-        return new UnCompressMaxMinLong();
-
-      case DATA_FLOAT:
-
-        return new UnCompressMaxMinFloat();
-
-      default:
-
-        return new UnCompressMaxMinDefault();
-
-    }
-  }
-
-  /**
-   * uncompress data value = value/Math.pow(10, decimal)
-   */
-  public static ValueCompressonHolder.UnCompressValue unCompressNonDecimal(DataType compDataType,
-      DataType actualDataType) {
-    switch (compDataType) {
-      case DATA_BYTE:
-
-        return new UnCompressNonDecimalByte();
-
-      case DATA_SHORT:
-
-        return new UnCompressNonDecimalShort();
-
-      case DATA_INT:
-
-        return new UnCompressNonDecimalInt();
-
-      case DATA_LONG:
-
-        return new UnCompressNonDecimalLong();
-
-      case DATA_FLOAT:
-
-        return new UnCompressNonDecimalFloat();
-
-      default:
-
-        return new UnCompressNonDecimalDefault();
-
-    }
-  }
-
-  /**
-   * uncompress data value = (maxValue - subValue)/Math.pow(10, decimal)
-   */
-  public static ValueCompressonHolder.UnCompressValue unCompressNonDecimalMaxMin(
-      DataType compDataType, DataType actualDataType) {
-    switch (compDataType) {
-      case DATA_BYTE:
-
-        return new UnCompressNonDecimalMaxMinByte();
-
-      case DATA_SHORT:
-
-        return new UnCompressNonDecimalMaxMinShort();
-
-      case DATA_INT:
-
-        return new UnCompressNonDecimalMaxMinInt();
-
-      case DATA_LONG:
-
-        return new UnCompressNonDecimalMaxMinLong();
-
-      case DATA_FLOAT:
-
-        return new UnCompressNonDecimalMaxMinFloat();
-
-      default:
-
-        return new UnCompressNonDecimalMaxMinDefault();
-
-    }
-  }
-
-  /**
-   * Create Value compression model
-   *
-   * @param maxValue
-   * @param minValue
-   * @param decimalLength
-   * @param uniqueValue
-   * @param aggType
-   * @param dataTypeSelected
-   * @return
-   */
-  public static ValueCompressionModel getValueCompressionModel(Object[] maxValue, Object[] minValue,
-      int[] decimalLength, Object[] uniqueValue, char[] aggType, byte[] dataTypeSelected) {
-
-    MeasureMetaDataModel metaDataModel =
-        new MeasureMetaDataModel(minValue, maxValue, decimalLength, maxValue.length, uniqueValue,
-            aggType, dataTypeSelected);
-    return getValueCompressionModel(metaDataModel);
-  }
-
-  public static ValueCompressionModel getValueCompressionModel(MeasureMetaDataModel measureMDMdl) {
-    int measureCount = measureMDMdl.getMeasureCount();
-    Object[] minValue = measureMDMdl.getMinValue();
-    Object[] maxValue = measureMDMdl.getMaxValue();
-    Object[] uniqueValue = measureMDMdl.getUniqueValue();
-    int[] decimal = measureMDMdl.getDecimal();
-    char[] type = measureMDMdl.getType();
-    byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
-    ValueCompressionModel compressionModel = new ValueCompressionModel();
-    DataType[] actualType = new DataType[measureCount];
-    DataType[] changedType = new DataType[measureCount];
-    COMPRESSION_TYPE[] compType = new COMPRESSION_TYPE[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      CompressionFinder compresssionFinder = ValueCompressionUtil
-          .getCompressionType(maxValue[i], minValue[i], decimal[i], type[i], dataTypeSelected[i]);
-      actualType[i] = compresssionFinder.actualDataType;
-      changedType[i] = compresssionFinder.changedDataType;
-      compType[i] = compresssionFinder.compType;
-    }
-    compressionModel.setMaxValue(maxValue);
-    compressionModel.setDecimal(decimal);
-    compressionModel.setChangedDataType(changedType);
-    compressionModel.setCompType(compType);
-    compressionModel.setActualDataType(actualType);
-    compressionModel.setMinValue(minValue);
-    compressionModel.setUniqueValue(uniqueValue);
-    compressionModel.setType(type);
-    compressionModel.setMinValueFactForAgg(measureMDMdl.getMinValueFactForAgg());
-    compressionModel.setDataTypeSelected(dataTypeSelected);
-    ValueCompressonHolder.UnCompressValue[] values = ValueCompressionUtil
-        .getUncompressedValues(compressionModel.getCompType(), compressionModel.getActualDataType(),
-            compressionModel.getChangedDataType());
-    compressionModel.setUnCompressValues(values);
-    return compressionModel;
-  }
-
-  public static byte[] convertToBytes(short[] values) {
-    ByteBuffer buffer = ByteBuffer.allocate(values.length * 2);
-    for (short val : values) {
-      buffer.putShort(val);
-    }
-    return buffer.array();
-  }
-
-  public static byte[] convertToBytes(int[] values) {
-    ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
-    for (int val : values) {
-      buffer.putInt(val);
-    }
-    return buffer.array();
-  }
-
-  public static byte[] convertToBytes(float[] values) {
-    ByteBuffer buffer = ByteBuffer.allocate(values.length * 4);
-    for (float val : values) {
-      buffer.putFloat(val);
-    }
-    return buffer.array();
-  }
-
-  public static byte[] convertToBytes(long[] values) {
-    ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
-    for (long val : values) {
-      buffer.putLong(val);
-    }
-    return buffer.array();
-  }
-
-  public static byte[] convertToBytes(double[] values) {
-    ByteBuffer buffer = ByteBuffer.allocate(values.length * 8);
-    for (double val : values) {
-      buffer.putDouble(val);
-    }
-    return buffer.array();
-  }
-
-  public static short[] convertToShortArray(ByteBuffer buffer, int length) {
-    buffer.rewind();
-    short[] values = new short[length / 2];
-
-    for (int i = 0; i < values.length; i++) {
-      values[i] = buffer.getShort();
-    }
-    return values;
-  }
-
-  public static int[] convertToIntArray(ByteBuffer buffer, int length) {
-    buffer.rewind();
-    int[] values = new int[length / 4];
-
-    for (int i = 0; i < values.length; i++) {
-      values[i] = buffer.getInt();
-    }
-    return values;
-  }
-
-  public static float[] convertToFloatArray(ByteBuffer buffer, int length) {
-    buffer.rewind();
-    float[] values = new float[length / 4];
-
-    for (int i = 0; i < values.length; i++) {
-      values[i] = buffer.getFloat();
-    }
-    return values;
-  }
-
-  public static long[] convertToLongArray(ByteBuffer buffer, int length) {
-    buffer.rewind();
-    long[] values = new long[length / 8];
-    for (int i = 0; i < values.length; i++) {
-      values[i] = buffer.getLong();
-    }
-    return values;
-  }
-
-  public static double[] convertToDoubleArray(ByteBuffer buffer, int length) {
-    buffer.rewind();
-    double[] values = new double[length / 8];
-    for (int i = 0; i < values.length; i++) {
-      values[i] = buffer.getDouble();
-    }
-    return values;
-  }
-
-  /**
-   * use to identify compression type.
-   */
-  public static enum COMPRESSION_TYPE {
-    /**
-     *
-     */
-    NONE, /**
-     *
-     */
-    MAX_MIN, /**
-     *
-     */
-    NON_DECIMAL_CONVERT, /**
-     *
-     */
-    MAX_MIN_NDC,
-
-    /**
-     * custome
-     */
-    CUSTOM,
-
-    CUSTOM_BIGDECIMAL
-  }
-
-  /**
-   * use to identify the type of data.
-   */
-  public static enum DataType {
-    /**
-     *
-     */
-    DATA_BYTE(), /**
-     *
-     */
-    DATA_SHORT(), /**
-     *
-     */
-    DATA_INT(), /**
-     *
-     */
-    DATA_FLOAT(), /**
-     *
-     */
-    DATA_LONG(), /**
-     *
-     */
-    DATA_BIGINT(), /**
-     *
-     */
-    DATA_DOUBLE();
-
-    /**
-     * DataType.
-     */
-    private DataType() {
-      //this.size = size;
-    }
-
-  }
-
-  /**
-   * through the size of data type,priority and compression type, select the
-   * best compression type
-   */
-  private static class CompressionFinder implements Comparable<CompressionFinder> {
-    /**
-     * compType.
-     */
-    private COMPRESSION_TYPE compType;
-    /**
-     * actualDataType.
-     */
-    private DataType actualDataType;
-    /**
-     * changedDataType.
-     */
-    private DataType changedDataType;
-    /**
-     * the size of changed data
-     */
-    private int size;
-    /**
-     * priority.
-     */
-    private PRIORITY priority;
-
-    /**
-     * CompressionFinder constructor.
-     *
-     * @param compType
-     * @param actualDataType
-     * @param changedDataType
-     */
-    CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
-        DataType changedDataType) {
-      super();
-      this.compType = compType;
-      this.actualDataType = actualDataType;
-      this.changedDataType = changedDataType;
-    }
-
-    /**
-     * CompressionFinder overloaded constructor.
-     *
-     * @param actualDataType
-     * @param changedDataType
-     * @param priority
-     * @param compType
-     */
-
-    CompressionFinder(DataType actualDataType, DataType changedDataType, PRIORITY priority,
-        COMPRESSION_TYPE compType) {
-      super();
-      this.actualDataType = actualDataType;
-      this.changedDataType = changedDataType;
-      this.size = getSize(changedDataType);
-      this.priority = priority;
-      this.compType = compType;
-    }
-
-    @Override public boolean equals(Object obj) {
-      boolean equals = false;
-      if (obj instanceof CompressionFinder) {
-        CompressionFinder cf = (CompressionFinder) obj;
-
-        if (this.size == cf.size && this.priority == cf.priority) {
-          equals = true;
-        }
-
-      }
-      return equals;
-    }
-
-    @Override public int hashCode() {
-      final int code = 31;
-      int result = 1;
-
-      result = code * result + this.size;
-      result = code * result + ((priority == null) ? 0 : priority.hashCode());
-      return result;
-    }
-
-    @Override public int compareTo(CompressionFinder o) {
-      int returnVal = 0;
-      // the big size have high priority
-      if (this.equals(o)) {
-        returnVal = 0;
-      } else if (this.size == o.size) {
-        // the compression type priority
-        if (priority.priority > o.priority.priority) {
-          returnVal = 1;
-        } else if (priority.priority < o.priority.priority) {
-          returnVal = -1;
-        }
-
-      } else if (this.size > o.size) {
-        returnVal = 1;
-      } else {
-        returnVal = -1;
-      }
-      return returnVal;
-    }
-
-    /**
-     * Compression type priority.
-     * ACTUAL is the highest priority and DIFFNONDECIMAL is the lowest
-     * priority
-     */
-    static enum PRIORITY {
-      /**
-       *
-       */
-      ACTUAL(0), /**
-       *
-       */
-      DIFFSIZE(1), /**
-       *
-       */
-      MAXNONDECIMAL(2), /**
-       *
-       */
-      DIFFNONDECIMAL(3);
-
-      /**
-       * priority.
-       */
-      private int priority;
-
-      private PRIORITY(int priority) {
-        this.priority = priority;
-      }
-    }
-  }
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/ByteArrayHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/ByteArrayHolder.java b/core/src/main/java/org/carbondata/core/writer/ByteArrayHolder.java
deleted file mode 100644
index 599fe09..0000000
--- a/core/src/main/java/org/carbondata/core/writer/ByteArrayHolder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.writer;
-
-import java.util.Arrays;
-
-import org.carbondata.core.util.ByteUtil;
-
-public class ByteArrayHolder implements Comparable<ByteArrayHolder> {
-
-  /**
-   * mdkey
-   */
-  private byte[] mdKey;
-
-  /**
-   * primary key
-   */
-  private int primaryKey;
-
-  /**
-   * @param mdKey
-   * @param primaryKey
-   */
-  public ByteArrayHolder(byte[] mdKey, int primaryKey) {
-    this.mdKey = mdKey;
-    this.primaryKey = primaryKey;
-  }
-
-  @Override public int compareTo(ByteArrayHolder o) {
-    return ByteUtil.compare(mdKey, o.mdKey);
-  }
-
-  @Override public boolean equals(Object obj) {
-    // TODO Auto-generated method stub
-    if (obj instanceof ByteArrayHolder) {
-      if (0 == ByteUtil.compare(mdKey, ((ByteArrayHolder) obj).mdKey)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override public int hashCode() {
-    int prime = 31;
-    int result = prime * Arrays.hashCode(mdKey);
-    result = result + prime * primaryKey;
-    return result;
-  }
-
-  public byte[] getMdKey() {
-    return mdKey;
-  }
-
-  public int getPrimaryKey() {
-    return primaryKey;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
deleted file mode 100644
index e13c32f..0000000
--- a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.writer;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * dictionary writer interface
- */
-public interface CarbonDictionaryWriter extends Closeable {
-  /**
-   * write method that accepts one value at a time
-   * This method can be used when data is huge and memory is les. In that
-   * case data can be stored to a file and an iterator can iterate over it and
-   * pass one value at a time
-   *
-   * @param value unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  void write(String value) throws IOException;
-
-  /**
-   * write method that accepts one value at a time
-   * This method can be used when data is huge and memory is les. In that
-   * case data can be stored to a file and an iterator can iterate over it and
-   * pass one value at a time
-   *
-   * @param value unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  void write(byte[] value) throws IOException;
-
-  /**
-   * write method that accepts list of byte arrays as value
-   * This can be used when data is less, then string can be converted
-   * to byte array for each value and added to a list
-   *
-   * @param valueList list of byte array. Each byte array is unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  void write(List<byte[]> valueList) throws IOException;
-
-
-  void commit() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
deleted file mode 100644
index 2508c86..0000000
--- a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.carbondata.common.factory.CarbonCommonFactory;
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.CarbonTableIdentifier;
-import org.carbondata.core.carbon.ColumnIdentifier;
-import org.carbondata.core.carbon.path.CarbonTablePath;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
-import org.carbondata.core.reader.CarbonDictionaryMetadataReader;
-import org.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
-import org.carbondata.core.service.PathService;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.format.ColumnDictionaryChunk;
-import org.carbondata.format.ColumnDictionaryChunkMeta;
-
-import org.apache.thrift.TBase;
-
-/**
- * This class is responsible for writing the dictionary file and its metadata
- */
-public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonDictionaryWriterImpl.class.getName());
-
-  /**
-   * carbon type identifier
-   */
-  protected CarbonTableIdentifier carbonTableIdentifier;
-
-  /**
-   * list which will hold values upto maximum of one dictionary chunk size
-   */
-  private List<ByteBuffer> oneDictionaryChunkList;
-
-  /**
-   * Meta object which will hold last segment entry details
-   */
-  private CarbonDictionaryColumnMetaChunk chunkMetaObjectForLastSegmentEntry;
-
-  /**
-   * dictionary file and meta thrift writer
-   */
-  private ThriftWriter dictionaryThriftWriter;
-
-  /**
-   * column identifier
-   */
-  protected ColumnIdentifier columnIdentifier;
-
-  /**
-   * HDFS store path
-   */
-  protected String hdfsStorePath;
-
-  /**
-   * dictionary file path
-   */
-  protected String dictionaryFilePath;
-
-  /**
-   * dictionary metadata file path
-   */
-  protected String dictionaryMetaFilePath;
-
-  /**
-   * start offset of dictionary chunk  for a segment
-   */
-  private long chunk_start_offset;
-
-  /**
-   * end offset of a dictionary chunk for a segment
-   */
-  private long chunk_end_offset;
-
-  /**
-   * total dictionary value record count for one segment
-   */
-  private int totalRecordCount;
-
-  /**
-   * total thrift object chunk count written for one segment
-   */
-  private int chunk_count;
-
-  /**
-   * chunk size for a dictionary file after which data will be written to disk
-   */
-  private int dictionary_one_chunk_size;
-
-  /**
-   * flag to check whether write method is called for first time
-   */
-  private boolean isFirstTime;
-
-  private static final Charset defaultCharset = Charset.forName(
-      CarbonCommonConstants.DEFAULT_CHARSET);
-
-  /**
-   * Constructor
-   *
-   * @param hdfsStorePath         HDFS store path
-   * @param carbonTableIdentifier table identifier which will give table name and database name
-   * @param columnIdentifier      column unique identifier
-   */
-  public CarbonDictionaryWriterImpl(String hdfsStorePath,
-      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) {
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.columnIdentifier = columnIdentifier;
-    this.hdfsStorePath = hdfsStorePath;
-    this.isFirstTime = true;
-  }
-
-  /**
-   * This method will write the data in thrift format to disk. This method will be guided by
-   * parameter dictionary_one_chunk_size and data will be divided into chunks
-   * based on this parameter
-   *
-   * @param value unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  @Override public void write(String value) throws IOException {
-    write(value.getBytes(defaultCharset));
-  }
-
-  /**
-   * This method will write the data in thrift format to disk. This method will be guided by
-   * parameter dictionary_one_chunk_size and data will be divided into chunks
-   * based on this parameter
-   *
-   * @param value unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  @Override public void write(byte[] value) throws IOException {
-    if (isFirstTime) {
-      init();
-      isFirstTime = false;
-    }
-    // if one chunk size is equal to list size then write the data to file
-    checkAndWriteDictionaryChunkToFile();
-    oneDictionaryChunkList.add(ByteBuffer.wrap(value));
-    totalRecordCount++;
-  }
-
-  /**
-   * This method will write the data in thrift format to disk. This method will not be guided by
-   * parameter dictionary_one_chunk_size and complete data will be written as one chunk
-   *
-   * @param valueList list of byte array. Each byte array is unique dictionary value
-   * @throws IOException if an I/O error occurs
-   */
-  @Override public void write(List<byte[]> valueList) throws IOException {
-    if (isFirstTime) {
-      init();
-      isFirstTime = false;
-    }
-    for (byte[] value : valueList) {
-      oneDictionaryChunkList.add(ByteBuffer.wrap(value));
-      totalRecordCount++;
-    }
-  }
-
-  /**
-   * write dictionary metadata file and close thrift object
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override public void close() throws IOException {
-    if (null != dictionaryThriftWriter) {
-      writeDictionaryFile();
-      // close the thrift writer for dictionary file
-      closeThriftWriter();
-    }
-  }
-
-  /**
-   * check if the threshold has been reached for the number of
-   * values that can kept in memory and then flush the data to file
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  private void checkAndWriteDictionaryChunkToFile() throws IOException {
-    if (oneDictionaryChunkList.size() >= dictionary_one_chunk_size) {
-      writeDictionaryFile();
-      createChunkList();
-    }
-  }
-
-  /**
-   * This method will serialize the object of dictionary file
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  private void writeDictionaryFile() throws IOException {
-    ColumnDictionaryChunk columnDictionaryChunk = new ColumnDictionaryChunk();
-    columnDictionaryChunk.setValues(oneDictionaryChunkList);
-    writeThriftObject(columnDictionaryChunk);
-  }
-
-  /**
-   * This method will check and created the directory path where dictionary file has to be created
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  private void init() throws IOException {
-    initDictionaryChunkSize();
-    initPaths();
-    boolean dictFileExists = CarbonUtil.isFileExists(this.dictionaryFilePath);
-    if (dictFileExists && CarbonUtil.isFileExists(this.dictionaryMetaFilePath)) {
-      this.chunk_start_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
-      validateDictionaryFileOffsetWithLastSegmentEntryOffset();
-    } else if (dictFileExists) {
-      FileFactory.getCarbonFile(dictionaryFilePath, FileFactory.getFileType(dictionaryFilePath))
-          .delete();
-    }
-    openThriftWriter(this.dictionaryFilePath);
-    createChunkList();
-  }
-
-  protected void initPaths() {
-    PathService pathService = CarbonCommonFactory.getPathService();
-    CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(columnIdentifier,
-            this.hdfsStorePath, carbonTableIdentifier);
-    this.dictionaryFilePath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
-    this.dictionaryMetaFilePath =
-        carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
-  }
-
-  /**
-   * initialize the value of dictionary chunk that can be kept in memory at a time
-   */
-  private void initDictionaryChunkSize() {
-    try {
-      dictionary_one_chunk_size = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE,
-              CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT));
-    } catch (NumberFormatException e) {
-      dictionary_one_chunk_size =
-          Integer.parseInt(CarbonCommonConstants.DICTIONARY_ONE_CHUNK_SIZE_DEFAULT);
-      LOGGER.error("Dictionary chunk size not configured properly. Taking default size "
-              + dictionary_one_chunk_size);
-    }
-  }
-
-  /**
-   * initialise one dictionary size chunk list and increment chunk count
-   */
-  private void createChunkList() {
-    this.oneDictionaryChunkList = new ArrayList<ByteBuffer>(dictionary_one_chunk_size);
-    chunk_count++;
-  }
-
-  /**
-   * if file already exists then read metadata file and
-   * validate the last entry end offset with file size. If
-   * they are not equal that means some invalid data is present which needs
-   * to be truncated
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  private void validateDictionaryFileOffsetWithLastSegmentEntryOffset() throws IOException {
-    // read last dictionary chunk meta entry from dictionary metadata file
-    chunkMetaObjectForLastSegmentEntry = getChunkMetaObjectForLastSegmentEntry();
-    int bytesToTruncate =
-        (int) (chunk_start_offset - chunkMetaObjectForLastSegmentEntry.getEnd_offset());
-    if (bytesToTruncate > 0) {
-      LOGGER.info("some inconsistency in dictionary file for column " + this.columnIdentifier);
-      // truncate the dictionary data till chunk meta end offset
-      FileFactory.FileType fileType = FileFactory.getFileType(this.dictionaryFilePath);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(this.dictionaryFilePath, fileType);
-      boolean truncateSuccess = carbonFile
-          .truncate(this.dictionaryFilePath, chunkMetaObjectForLastSegmentEntry.getEnd_offset());
-      if (!truncateSuccess) {
-        LOGGER.info("Diction file not truncated successfully for column " + this.columnIdentifier);
-      }
-    }
-  }
-
-  /**
-   * This method will write the dictionary metadata file for a given column
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  private void writeDictionaryMetadataFile() throws IOException {
-    // Format of dictionary metadata file
-    // min, max, start offset, end offset and chunk count
-    int min_surrogate_key = 0;
-    int max_surrogate_key = 0;
-    // case 1: first time dictionary writing
-    // previousMax = 0, totalRecordCount = 5, min = 1, max= 5
-    // case2: file already exists
-    // previousMax = 5, totalRecordCount = 10, min = 6, max = 15
-    // case 3: no unique values, total records 0
-    // previousMax = 15, totalRecordCount = 0, min = 15, max = 15
-    // both min and max equal to previous max
-    if (null != chunkMetaObjectForLastSegmentEntry) {
-      if (0 == totalRecordCount) {
-        min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key();
-      } else {
-        min_surrogate_key = chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + 1;
-      }
-      max_surrogate_key =
-          chunkMetaObjectForLastSegmentEntry.getMax_surrogate_key() + totalRecordCount;
-    } else {
-      if (totalRecordCount > 0) {
-        min_surrogate_key = 1;
-      }
-      max_surrogate_key = totalRecordCount;
-    }
-    ColumnDictionaryChunkMeta dictionaryChunkMeta =
-        new ColumnDictionaryChunkMeta(min_surrogate_key, max_surrogate_key, chunk_start_offset,
-            chunk_end_offset, chunk_count);
-    openThriftWriter(this.dictionaryMetaFilePath);
-    // write dictionary metadata file
-    writeThriftObject(dictionaryChunkMeta);
-    closeThriftWriter();
-    LOGGER.info("Dictionary metadata file written successfully for column " + this.columnIdentifier
-            + " at path " + this.dictionaryMetaFilePath);
-  }
-
-  /**
-   * open thrift writer for writing dictionary chunk/meta object
-   *
-   * @param dictionaryFile can be dictionary file name or dictionary metadata file name
-   * @throws IOException if an I/O error occurs
-   */
-  private void openThriftWriter(String dictionaryFile) throws IOException {
-    // create thrift writer instance
-    dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
-    // open the file stream
-    dictionaryThriftWriter.open();
-  }
-
-  /**
-   * This method will write the thrift object to a file
-   *
-   * @param dictionaryThriftObject can be dictionary thrift object or dictionary metadata
-   *                               thrift object
-   * @throws IOException if an I/O error occurs
-   */
-  private void writeThriftObject(TBase dictionaryThriftObject) throws IOException {
-    dictionaryThriftWriter.write(dictionaryThriftObject);
-  }
-
-  /**
-   * close dictionary thrift writer
-   */
-  private void closeThriftWriter() {
-    if (null != dictionaryThriftWriter) {
-      dictionaryThriftWriter.close();
-    }
-  }
-
-  /**
-   * This method will read the dictionary chunk metadata thrift object for last entry
-   *
-   * @return last entry of dictionary meta chunk
-   * @throws IOException if an I/O error occurs
-   */
-  private CarbonDictionaryColumnMetaChunk getChunkMetaObjectForLastSegmentEntry()
-      throws IOException {
-    CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunk = null;
-    CarbonDictionaryMetadataReader columnMetadataReaderImpl = getDictionaryMetadataReader();
-    try {
-      // read the last segment entry for dictionary metadata
-      carbonDictionaryColumnMetaChunk =
-          columnMetadataReaderImpl.readLastEntryOfDictionaryMetaChunk();
-    } finally {
-      // Close metadata reader
-      columnMetadataReaderImpl.close();
-    }
-    return carbonDictionaryColumnMetaChunk;
-  }
-
-  /**
-   * @return
-   */
-  protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() {
-    return new CarbonDictionaryMetadataReaderImpl(hdfsStorePath, carbonTableIdentifier,
-        columnIdentifier);
-  }
-
-  @Override public void commit() throws IOException {
-    if (null != dictionaryThriftWriter) {
-      this.chunk_end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
-      writeDictionaryMetadataFile();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
deleted file mode 100644
index 17b5686..0000000
--- a/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.writer;
-
-import java.io.IOException;
-
-import org.carbondata.format.FileFooter;
-
-/**
- * Writes metadata block to the fact table file in thrift format org.carbondata.format.FileFooter
- */
-public class CarbonFooterWriter {
-
-  // It is version number of this format class.
-  private static int VERSION_NUMBER = 1;
-
-  // Fact file path
-  private String filePath;
-
-  public CarbonFooterWriter(String filePath) {
-    this.filePath = filePath;
-  }
-
-  /**
-   * It writes FileFooter thrift format object to file.
-   *
-   * @param footer
-   * @param currentPosition At where this metadata is going to be written.
-   * @throws IOException
-   */
-  public void writeFooter(FileFooter footer, long currentPosition) throws IOException {
-
-    ThriftWriter thriftWriter = openThriftWriter(filePath);
-    footer.setVersion(VERSION_NUMBER);
-    try {
-      thriftWriter.write(footer);
-      thriftWriter.writeOffset(currentPosition);
-    } catch (Exception e) {
-      throw e;
-    } finally {
-      thriftWriter.close();
-    }
-  }
-
-  /**
-   * open thrift writer for writing dictionary chunk/meta object
-   */
-  private ThriftWriter openThriftWriter(String filePath) throws IOException {
-    // create thrift writer instance
-    ThriftWriter thriftWriter = new ThriftWriter(filePath, true);
-    // open the file stream
-    thriftWriter.open();
-    return thriftWriter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
deleted file mode 100644
index 5ae7b33..0000000
--- a/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.writer;
-
-import java.io.IOException;
-
-import org.apache.thrift.TBase;
-
-/**
- * Reader class which will be used to read the index file
- */
-public class CarbonIndexFileWriter {
-
-  /**
-   * thrift writer object
-   */
-  private ThriftWriter thriftWriter;
-
-  /**
-   * It writes thrift object to file
-   *
-   * @param footer
-   * @throws IOException
-   */
-  public void writeThrift(TBase indexObject) throws IOException {
-    thriftWriter.write(indexObject);
-  }
-
-  /**
-   * Below method will be used to open the thrift writer
-   *
-   * @param filePath file path where data need to be written
-   * @throws IOException throws io exception in case of any failure
-   */
-  public void openThriftWriter(String filePath) throws IOException {
-    // create thrift writer instance
-    thriftWriter = new ThriftWriter(filePath, true);
-    // open the file stream
-    thriftWriter.open();
-  }
-
-  /**
-   * Below method will be used to close the thrift object
-   */
-  public void close() {
-    thriftWriter.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/HierarchyValueWriterForCSV.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/HierarchyValueWriterForCSV.java b/core/src/main/java/org/carbondata/core/writer/HierarchyValueWriterForCSV.java
deleted file mode 100644
index d75ac6f..0000000
--- a/core/src/main/java/org/carbondata/core/writer/HierarchyValueWriterForCSV.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.writer;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.pentaho.di.core.exception.KettleException;
-
-public class HierarchyValueWriterForCSV {
-
-  /**
-   * Comment for <code>LOGGER</code>
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(HierarchyValueWriterForCSV.class.getName());
-  /**
-   * hierarchyName
-   */
-  private String hierarchyName;
-
-  /**
-   * bufferedOutStream
-   */
-  private FileChannel outPutFileChannel;
-
-  /**
-   * storeFolderLocation
-   */
-  private String storeFolderLocation;
-
-  /**
-   * intialized
-   */
-  private boolean intialized;
-
-  /**
-   * counter the number of files.
-   */
-  private int counter;
-
-  /**
-   * byteArrayList
-   */
-  private List<ByteArrayHolder> byteArrayholder =
-      new ArrayList<ByteArrayHolder>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-  /**
-   * toflush
-   */
-  private int toflush;
-
-  public HierarchyValueWriterForCSV(String hierarchy, String storeFolderLocation) {
-    this.hierarchyName = hierarchy;
-    this.storeFolderLocation = storeFolderLocation;
-
-    CarbonProperties instance = CarbonProperties.getInstance();
-
-    this.toflush = Integer.parseInt(instance
-        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
-
-    int rowSetSize = Integer.parseInt(instance.getProperty(CarbonCommonConstants.GRAPH_ROWSET_SIZE,
-        CarbonCommonConstants.GRAPH_ROWSET_SIZE_DEFAULT));
-
-    if (this.toflush > rowSetSize) {
-      this.toflush = rowSetSize;
-    }
-
-    updateCounter(hierarchy, storeFolderLocation);
-  }
-
-  /**
-   * @return Returns the byteArrayList.
-   */
-  public List<ByteArrayHolder> getByteArrayList() throws KettleException {
-    return byteArrayholder;
-  }
-
-  public FileChannel getBufferedOutStream() {
-    return outPutFileChannel;
-  }
-
-  private void updateCounter(final String meString, String storeFolderLocation) {
-    File storeFolder = new File(storeFolderLocation);
-
-    File[] listFiles = storeFolder.listFiles(new FileFilter() {
-
-      @Override public boolean accept(File file) {
-        if (file.getName().indexOf(meString) > -1)
-
-        {
-          return true;
-        }
-        return false;
-      }
-    });
-
-    if (null == listFiles || listFiles.length == 0) {
-      counter = 0;
-      return;
-    }
-
-    for (File hierFile : listFiles) {
-      String hierFileName = hierFile.getName();
-
-      if (hierFileName.endsWith(CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
-        hierFileName = hierFileName.substring(0, hierFileName.lastIndexOf('.'));
-        try {
-          counter = Integer.parseInt(hierFileName.substring(hierFileName.length() - 1));
-        } catch (NumberFormatException nfe) {
-
-          if (new File(hierFileName + '0' + CarbonCommonConstants.LEVEL_FILE_EXTENSION).exists()) {
-            // Need to skip because the case can come in which server went down while files were
-            // merging and the other hierarchy files were not deleted, and the current file
-            // status is inrogress. so again we will merge the files and rename to normal file
-            LOGGER.info("Need to skip as this can be case in which hierarchy file already renamed");
-            if (hierFile.delete()) {
-              LOGGER.info("Deleted the Inprogress hierarchy Files.");
-            }
-          } else {
-            // levelfileName0.level file not exist that means files is merged and other
-            // files got deleted. while renaming this file from inprogress to normal file,
-            // server got restarted/killed. so we need to rename the file to normal.
-
-            File inprogressFile = new File(storeFolder + File.separator + hierFile.getName());
-            File changetoName = new File(storeFolder + File.separator + hierFileName);
-
-            if (inprogressFile.renameTo(changetoName)) {
-              LOGGER.info(
-                  "Renaming the level Files while creating the new instance on server startup.");
-            }
-
-          }
-
-        }
-      }
-
-      String val = hierFileName.substring(hierFileName.length() - 1);
-
-      int parsedVal = getIntValue(val);
-
-      if (counter < parsedVal) {
-        counter = parsedVal;
-      }
-    }
-    counter++;
-  }
-
-  private int getIntValue(String val) {
-    int parsedVal = 0;
-    try {
-      parsedVal = Integer.parseInt(val);
-    } catch (NumberFormatException nfe) {
-      LOGGER.info("Hierarchy File is already renamed so there will not be"
-              + "any need to keep the counter");
-    }
-    return parsedVal;
-  }
-
-  private void intialize() throws KettleException {
-    intialized = true;
-
-    File f = new File(storeFolderLocation + File.separator + hierarchyName + counter
-        + CarbonCommonConstants.FILE_INPROGRESS_STATUS);
-
-    counter++;
-
-    FileOutputStream fos = null;
-
-    boolean isFileCreated = false;
-    if (!f.exists()) {
-      try {
-        isFileCreated = f.createNewFile();
-
-      } catch (IOException e) {
-        //not required: findbugs fix
-        throw new KettleException("unable to create member mapping file", e);
-      }
-      if (!isFileCreated) {
-        throw new KettleException("unable to create file" + f.getAbsolutePath());
-      }
-    }
-
-    try {
-      fos = new FileOutputStream(f);
-
-      outPutFileChannel = fos.getChannel();
-    } catch (FileNotFoundException e) {
-      closeStreamAndDeleteFile(f, outPutFileChannel, fos);
-      throw new KettleException("member Mapping File not found to write mapping info", e);
-    }
-  }
-
-  public void writeIntoHierarchyFile(byte[] bytes, int primaryKey) throws KettleException {
-    if (!intialized) {
-      intialize();
-    }
-
-    ByteBuffer byteBuffer = storeValueInCache(bytes, primaryKey);
-
-    try {
-      byteBuffer.flip();
-      outPutFileChannel.write(byteBuffer);
-    } catch (IOException e) {
-      throw new KettleException("Error while writting in the hierarchy mapping file", e);
-    }
-  }
-
-  private ByteBuffer storeValueInCache(byte[] bytes, int primaryKey) {
-
-    // adding 4 to store the total length of the row at the beginning
-    ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 4);
-
-    buffer.put(bytes);
-    buffer.putInt(primaryKey);
-
-    return buffer;
-  }
-
-  public void performRequiredOperation() throws KettleException {
-    if (byteArrayholder.size() == 0) {
-      return;
-    }
-    //write to the file and close the stream.
-    Collections.sort(byteArrayholder);
-
-    for (ByteArrayHolder byteArray : byteArrayholder) {
-      writeIntoHierarchyFile(byteArray.getMdKey(), byteArray.getPrimaryKey());
-    }
-
-    CarbonUtil.closeStreams(outPutFileChannel);
-
-    //rename the inprogress file to normal .level file
-    String filePath = this.storeFolderLocation + File.separator + hierarchyName + (counter - 1)
-        + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    File inProgressFile = new File(filePath);
-    String inprogressFileName = inProgressFile.getName();
-
-    String changedFileName = inprogressFileName.substring(0, inprogressFileName.lastIndexOf('.'));
-
-    File orgFinalName = new File(this.storeFolderLocation + File.separator + changedFileName);
-
-    if (!inProgressFile.renameTo(orgFinalName)) {
-      LOGGER.error("Not able to rename file : " + inprogressFileName);
-    }
-
-    //create the new outputStream
-    try {
-      intialize();
-    } catch (KettleException e) {
-      LOGGER.error("Not able to create output stream for file:" + hierarchyName + (counter - 1));
-    }
-
-    //clear the byte array holder also.
-    byteArrayholder.clear();
-  }
-
-  private void closeStreamAndDeleteFile(File f, Closeable... streams) throws KettleException {
-    boolean isDeleted = false;
-    for (Closeable stream : streams) {
-      if (null != stream) {
-        try {
-          stream.close();
-        } catch (IOException e) {
-          LOGGER.error(e, "unable to close the stream ");
-        }
-
-      }
-    }
-
-    // delete the file
-    isDeleted = f.delete();
-    if (!isDeleted) {
-      LOGGER.error("Unable to delete the file " + f.getAbsolutePath());
-    }
-
-  }
-
-  public String getHierarchyName() {
-    return hierarchyName;
-  }
-
-  public int getCounter() {
-    return counter;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
deleted file mode 100644
index 2c5ee1d..0000000
--- a/core/src/main/java/org/carbondata/core/writer/ThriftWriter.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.writer;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TIOStreamTransport;
-
-/**
- * Simple class that makes it easy to write Thrift objects to disk.
- */
-public class ThriftWriter {
-
-  /**
-   * buffer size
-   */
-  private static final int bufferSize = 2048;
-
-  /**
-   * File to write to.
-   */
-  private String fileName;
-
-  /**
-   * For writing to the file.
-   */
-  private DataOutputStream dataOutputStream;
-
-  /**
-   * For binary serialization of objects.
-   */
-  private TProtocol binaryOut;
-
-  /**
-   * flag to append to existing file
-   */
-  private boolean append;
-
-  /**
-   * Constructor.
-   */
-  public ThriftWriter(String fileName, boolean append) {
-    this.fileName = fileName;
-    this.append = append;
-  }
-
-  /**
-   * Open the file for writing.
-   */
-  public void open() throws IOException {
-    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    dataOutputStream = FileFactory.getDataOutputStream(fileName, fileType, bufferSize, append);
-    binaryOut = new TCompactProtocol(new TIOStreamTransport(dataOutputStream));
-  }
-
-  /**
-   * Write the object to disk.
-   */
-  public void write(TBase t) throws IOException {
-    try {
-      t.write(binaryOut);
-      dataOutputStream.flush();
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Write the offset to the file
-   *
-   * @param offset
-   * @throws IOException
-   */
-  public void writeOffset(long offset) throws IOException {
-    dataOutputStream.writeLong(offset);
-  }
-
-  /**
-   * Close the file stream.
-   */
-  public void close() {
-    CarbonUtil.closeStreams(dataOutputStream);
-  }
-
-  /**
-   * Flush data to HDFS file
-   */
-  public void sync() throws IOException {
-    if (dataOutputStream instanceof FSDataOutputStream) {
-      ((FSDataOutputStream) dataOutputStream).hsync();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/exception/CarbonDataWriterException.java b/core/src/main/java/org/carbondata/core/writer/exception/CarbonDataWriterException.java
deleted file mode 100644
index 1e9ee18..0000000
--- a/core/src/main/java/org/carbondata/core/writer/exception/CarbonDataWriterException.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.core.writer.exception;
-
-import java.util.Locale;
-
-public class CarbonDataWriterException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataWriterException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataWriterException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
deleted file mode 100644
index 385efbe..0000000
--- a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.writer.sortindex;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Interface for writing the dictionary sort index and sort index revers data.
- */
-public interface CarbonDictionarySortIndexWriter extends Closeable {
-
-  /**
-   * The method is used write the dictionary sortIndex data to columns
-   * sortedIndex file in thrif format.
-   *
-   * @param sortIndexList list of sortIndex
-   * @throws IOException In Case of any I/O errors occurs.
-   */
-  public void writeSortIndex(List<Integer> sortIndexList) throws IOException;
-
-  /**
-   * The method is used write the dictionary sortIndexInverted data to columns
-   * sortedIndex file in thrif format.
-   *
-   * @param invertedSortIndexList list of  sortIndexInverted
-   * @throws IOException In Case of any I/O errors occurs.
-   */
-  public void writeInvertedSortIndex(List<Integer> invertedSortIndexList) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
deleted file mode 100644
index 9c398cb..0000000
--- a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.core.writer.sortindex;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-import org.carbondata.common.factory.CarbonCommonFactory;
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.CarbonTableIdentifier;
-import org.carbondata.core.carbon.ColumnIdentifier;
-import org.carbondata.core.carbon.path.CarbonTablePath;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.core.service.PathService;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.core.writer.ThriftWriter;
-import org.carbondata.format.ColumnSortInfo;
-
-/**
- * The class responsible for writing the dictionary/column sort index and sort index inverted data
- * in the thrift format
- */
-public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySortIndexWriter {
-
-  /**
-   * carbonTable Identifier holding the info of databaseName and tableName
-   */
-  protected CarbonTableIdentifier carbonTableIdentifier;
-
-  /**
-   * column name
-   */
-  protected ColumnIdentifier columnIdentifier;
-
-  /**
-   * carbon store location
-   */
-  protected String carbonStorePath;
-  /**
-   * Path of dictionary sort index file for which the sortIndex to be written
-   */
-  protected String sortIndexFilePath;
-  /**
-   * Instance of thrift writer to write the data
-   */
-  private ThriftWriter sortIndexThriftWriter;
-
-  /**
-   * Column sort info thrift instance.
-   */
-  private ColumnSortInfo columnSortInfo = new ColumnSortInfo();
-
-  /**
-   * Comment for <code>LOGGER</code>
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonDictionarySortIndexWriterImpl.class.getName());
-
-  /**
-   * @param carbonStorePath       Carbon store path
-   * @param carbonTableIdentifier table identifier which will give table name and database name
-   * @param columnIdentifier      column unique identifier
-   */
-  public CarbonDictionarySortIndexWriterImpl(final CarbonTableIdentifier carbonTableIdentifier,
-      final ColumnIdentifier columnIdentifier, final String carbonStorePath) {
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.columnIdentifier = columnIdentifier;
-    this.carbonStorePath = carbonStorePath;
-  }
-
-  /**
-   * The method is used populate the dictionary sortIndex data to columnSortInfo
-   * in thrif format.
-   *
-   * @param sortIndexList list of sortIndex
-   * @throws IOException In Case of any I/O errors occurs.
-   */
-  @Override public void writeSortIndex(List<Integer> sortIndexList) throws IOException {
-    columnSortInfo.setSort_index(sortIndexList);
-  }
-
-  /**
-   * The method is used populate the dictionary Inverted sortIndex data to columnSortInfo
-   * in thrif format.
-   *
-   * @param invertedSortIndexList list of  sortIndexInverted
-   * @throws IOException In Case of any I/O errors occurs.
-   */
-  @Override public void writeInvertedSortIndex(List<Integer> invertedSortIndexList)
-      throws IOException {
-    columnSortInfo.setSort_index_inverted(invertedSortIndexList);
-  }
-
-  /**
-   * Initialize the sortIndexFilePath and open writing stream
-   * for dictionary sortIndex file thrif writer
-   * write the column sort info to the store when both sort index  and sort index
-   * inverted are populated.
-   * existing sort index file has to be overwritten with new sort index data
-   * columnSortInfo having null sortIndex and invertedSortIndex will not be written
-   */
-  private void writeColumnSortInfo() throws IOException {
-    boolean isNotNull =
-        null != columnSortInfo.getSort_index() && null != columnSortInfo.sort_index_inverted;
-    if (isNotNull) {
-      initPath();
-      String folderContainingFile = CarbonTablePath.getFolderContainingFile(this.sortIndexFilePath);
-      boolean created = CarbonUtil.checkAndCreateFolder(folderContainingFile);
-      if (!created) {
-        LOGGER.error("Database metadata folder creation status :: " + created);
-        throw new IOException("Failed to created database metadata folder");
-      }
-      try {
-
-        this.sortIndexThriftWriter = new ThriftWriter(this.sortIndexFilePath, false);
-        this.sortIndexThriftWriter.open();
-        sortIndexThriftWriter.write(columnSortInfo);
-      } catch (IOException ie) {
-        LOGGER.error(ie,
-            "problem while writing the dictionary sort index file.");
-        throw new IOException("problem while writing the dictionary sort index file.", ie);
-      } finally {
-        if (null != sortIndexThriftWriter) {
-          this.sortIndexThriftWriter.close();
-        }
-        this.sortIndexFilePath = null;
-      }
-    }
-  }
-
-  protected void initPath() {
-    PathService pathService = CarbonCommonFactory.getPathService();
-    CarbonTablePath carbonTablePath = pathService
-        .getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
-    String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
-    long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
-    this.sortIndexFilePath =
-        carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
-    cleanUpOldSortIndex(carbonTablePath, dictionaryPath);
-  }
-
-  /**
-   * It cleans up old unused sortindex file
-   *
-   * @param carbonTablePath
-   */
-  protected void cleanUpOldSortIndex(CarbonTablePath carbonTablePath, String dictPath) {
-    CarbonFile dictFile =
-        FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath));
-    CarbonFile[] files =
-        carbonTablePath.getSortIndexFiles(dictFile.getParentFile(),
-            columnIdentifier.getColumnId());
-    int maxTime;
-    try {
-      maxTime = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
-    } catch (NumberFormatException e) {
-      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
-    }
-    if (null != files) {
-      Arrays.sort(files, new Comparator<CarbonFile>() {
-        @Override public int compare(CarbonFile o1, CarbonFile o2) {
-          return o1.getName().compareTo(o2.getName());
-        }
-      });
-      for (int i = 0; i < files.length - 1; i++) {
-        long difference = System.currentTimeMillis() - files[i].getLastModifiedTime();
-        long minutesElapsed = (difference / (1000 * 60));
-        if (minutesElapsed > maxTime) {
-          if (!files[i].delete()) {
-            LOGGER.warn("Failed to delete sortindex file." + files[i].getAbsolutePath());
-          } else {
-            LOGGER.info("Sort index file is deleted." + files[i].getAbsolutePath());
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Closes this stream and releases any system resources associated
-   * with it. If the stream is already closed then invoking this
-   * method has no effect.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override public void close() throws IOException {
-    writeColumnSortInfo();
-    if (null != sortIndexThriftWriter) {
-      sortIndexThriftWriter.close();
-    }
-  }
-}