You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:19 UTC

[03/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
deleted file mode 100644
index 10b3ad5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
-
-  /**
-   * temp file
-   */
-  private File tempFile;
-
-  /**
-   * read stream
-   */
-  private DataInputStream stream;
-
-  /**
-   * entry count
-   */
-  private int entryCount;
-
-  /**
-   * number record read
-   */
-  private int numberOfObjectRead;
-
-  /**
-   * return row
-   */
-  private Object[] returnRow;
-
-  /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * fileBufferSize for file reader stream size
-   */
-  private int fileBufferSize;
-
-  private Object[][] currentBuffer;
-
-  private Object[][] backupBuffer;
-
-  private boolean isBackupFilled;
-
-  private boolean prefetch;
-
-  private int bufferSize;
-
-  private int bufferRowCounter;
-
-  private ExecutorService executorService;
-
-  private Future<Void> submit;
-
-  private int prefetchRecordsProceesed;
-
-  /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortTempFileCompressionEnabled;
-
-  /**
-   * totalRecordFetch
-   */
-  private int totalRecordFetch;
-
-  private int noDictionaryCount;
-
-  private DataType[] aggType;
-
-  /**
-   * to store whether dimension is of dictionary type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  /**
-   * to store whether sort column is of dictionary type or not
-   */
-  private boolean[] isNoDictionarySortColumn;
-
-  /**
-   * Constructor to initialize
-   *
-   * @param tempFile
-   * @param dimensionCount
-   * @param complexDimensionCount
-   * @param measureCount
-   * @param fileBufferSize
-   * @param noDictionaryCount
-   * @param aggType
-   * @param isNoDictionaryDimensionColumn
-   */
-  public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
-      int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
-      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
-    // set temp file
-    this.tempFile = tempFile;
-
-    // set measure and dimension count
-    this.measureCount = measureCount;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-
-    this.noDictionaryCount = noDictionaryCount;
-    // set mdkey length
-    this.fileBufferSize = fileBufferSize;
-    this.executorService = Executors.newFixedThreadPool(1);
-    this.aggType = aggType;
-
-    this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
-    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
-  }
-
-  /**
-   * This method will be used to initialize
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while initializing
-   */
-  public void initialize() throws CarbonSortKeyAndGroupByException {
-    prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
-            CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
-    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
-            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
-    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-    if (this.isSortTempFileCompressionEnabled) {
-      LOGGER.info("Compression was used while writing the sortTempFile");
-    }
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
-            + " be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed.Default value will be used");
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-
-    initialise();
-  }
-
-  private void initialise() throws CarbonSortKeyAndGroupByException {
-    try {
-      if (isSortTempFileCompressionEnabled) {
-        this.bufferSize = sortTempFileNoOFRecordsInCompression;
-      }
-      stream = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
-      this.entryCount = stream.readInt();
-      if (prefetch) {
-        new DataFetcher(false).call();
-        totalRecordFetch += currentBuffer.length;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      } else {
-        if (isSortTempFileCompressionEnabled) {
-          new DataFetcher(false).call();
-        }
-      }
-
-    } catch (FileNotFoundException e) {
-      LOGGER.error(e);
-      throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
-    } catch (IOException e) {
-      LOGGER.error(e);
-      throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
-    }
-  }
-
-  /**
-   * This method will be used to read new row from file
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while reading
-   */
-  public void readRow() throws CarbonSortKeyAndGroupByException {
-    if (prefetch) {
-      fillDataForPrefetch();
-    } else if (isSortTempFileCompressionEnabled) {
-      if (bufferRowCounter >= bufferSize) {
-        try {
-          new DataFetcher(false).call();
-          bufferRowCounter = 0;
-        } catch (Exception e) {
-          LOGGER.error(e);
-          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
-        }
-
-      }
-      prefetchRecordsProceesed++;
-      returnRow = currentBuffer[bufferRowCounter++];
-    } else {
-      this.returnRow = getRowFromStream();
-    }
-  }
-
-  private void fillDataForPrefetch() {
-    if (bufferRowCounter >= bufferSize) {
-      if (isBackupFilled) {
-        bufferRowCounter = 0;
-        currentBuffer = backupBuffer;
-        totalRecordFetch += currentBuffer.length;
-        isBackupFilled = false;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      } else {
-        try {
-          submit.get();
-        } catch (Exception e) {
-          LOGGER.error(e);
-        }
-        bufferRowCounter = 0;
-        currentBuffer = backupBuffer;
-        isBackupFilled = false;
-        totalRecordFetch += currentBuffer.length;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      }
-    }
-    prefetchRecordsProceesed++;
-    returnRow = currentBuffer[bufferRowCounter++];
-  }
-
-  /**
-   * Reads row from file
-   * @return Object[]
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
-    Object[] holder = new Object[3];
-    int index = 0;
-    int nonDicIndex = 0;
-    int[] dim = new int[this.dimensionCount - this.noDictionaryCount];
-    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
-    Object[] measures = new Object[this.measureCount];
-    try {
-      // read dimension values
-      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-        if (isNoDictionaryDimensionColumn[i]) {
-          short len = stream.readShort();
-          byte[] array = new byte[len];
-          stream.readFully(array);
-          nonDicArray[nonDicIndex++] = array;
-        } else {
-          dim[index++] = stream.readInt();
-        }
-      }
-
-      for (int i = 0; i < complexDimensionCount; i++) {
-        short len = stream.readShort();
-        byte[] array = new byte[len];
-        stream.readFully(array);
-        nonDicArray[nonDicIndex++] = array;
-      }
-
-      index = 0;
-      // read measure values
-      for (int i = 0; i < this.measureCount; i++) {
-        if (stream.readByte() == 1) {
-          switch (aggType[i]) {
-            case SHORT:
-              measures[index++] = stream.readShort();
-              break;
-            case INT:
-              measures[index++] = stream.readInt();
-              break;
-            case LONG:
-              measures[index++] = stream.readLong();
-              break;
-            case DOUBLE:
-              measures[index++] = stream.readDouble();
-              break;
-            case DECIMAL:
-              int len = stream.readInt();
-              byte[] buff = new byte[len];
-              stream.readFully(buff);
-              measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
-          }
-        } else {
-          measures[index++] = null;
-        }
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
-      // increment number if record read
-      this.numberOfObjectRead++;
-    } catch (IOException e) {
-      LOGGER.error("Problme while reading the madkey fom sort temp file");
-      throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
-    }
-
-    //return out row
-    return holder;
-  }
-
-  /**
-   * below method will be used to get the row
-   *
-   * @return row
-   */
-  public Object[] getRow() {
-    return this.returnRow;
-  }
-
-  /**
-   * below method will be used to check whether any more records are present
-   * in file or not
-   *
-   * @return more row present in file
-   */
-  public boolean hasNext() {
-    if (prefetch || isSortTempFileCompressionEnabled) {
-      return this.prefetchRecordsProceesed < this.entryCount;
-    }
-    return this.numberOfObjectRead < this.entryCount;
-  }
-
-  /**
-   * Below method will be used to close streams
-   */
-  public void closeStream() {
-    CarbonUtil.closeStreams(stream);
-    executorService.shutdown();
-    this.backupBuffer = null;
-    this.currentBuffer = null;
-  }
-
-  /**
-   * This method will number of entries
-   *
-   * @return entryCount
-   */
-  public int getEntryCount() {
-    return entryCount;
-  }
-
-  @Override public int compareTo(SortTempFileChunkHolder other) {
-    int diff = 0;
-    int index = 0;
-    int noDictionaryIndex = 0;
-    int[] leftMdkArray = (int[]) returnRow[0];
-    int[] rightMdkArray = (int[]) other.returnRow[0];
-    byte[][] leftNonDictArray = (byte[][]) returnRow[1];
-    byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
-    for (boolean isNoDictionary : isNoDictionarySortColumn) {
-      if (isNoDictionary) {
-        diff = UnsafeComparer.INSTANCE
-            .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
-        if (diff != 0) {
-          return diff;
-        }
-        noDictionaryIndex++;
-      } else {
-        diff = leftMdkArray[index] - rightMdkArray[index];
-        if (diff != 0) {
-          return diff;
-        }
-        index++;
-      }
-
-    }
-    return diff;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof SortTempFileChunkHolder)) {
-      return false;
-    }
-    SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
-
-    return this == o;
-  }
-
-  @Override public int hashCode() {
-    int hash = 0;
-    hash += 31 * measureCount;
-    hash += 31 * dimensionCount;
-    hash += 31 * complexDimensionCount;
-    hash += 31 * noDictionaryCount;
-    hash += tempFile.hashCode();
-    return hash;
-  }
-
-  private final class DataFetcher implements Callable<Void> {
-    private boolean isBackUpFilling;
-
-    private int numberOfRecords;
-
-    private DataFetcher(boolean backUp) {
-      isBackUpFilling = backUp;
-      calculateNumberOfRecordsToBeFetched();
-    }
-
-    private void calculateNumberOfRecordsToBeFetched() {
-      int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
-      numberOfRecords =
-          bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
-    }
-
-    @Override public Void call() throws Exception {
-      try {
-        if (isBackUpFilling) {
-          backupBuffer = prefetchRecordsFromFile(numberOfRecords);
-          isBackupFilled = true;
-        } else {
-          currentBuffer = prefetchRecordsFromFile(numberOfRecords);
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-      }
-      return null;
-    }
-
-  }
-
-  /**
-   * This method will read the records from sort temp file and keep it in a buffer
-   *
-   * @param numberOfRecords
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
-      throws CarbonSortKeyAndGroupByException {
-    Object[][] records = new Object[numberOfRecords][];
-    for (int i = 0; i < numberOfRecords; i++) {
-      records[i] = getRowFromStream();
-    }
-    return records;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
deleted file mode 100644
index f0bac85..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class SortTempFileChunkWriter implements TempSortFileWriter {
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  /**
-   * recordPerLeaf
-   */
-  private int recordPerLeaf;
-
-  /**
-   * CarbonCompressedSortTempFileChunkWriter
-   *
-   * @param writer
-   */
-  public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) {
-    this.writer = writer;
-    this.recordPerLeaf = recordPerLeaf;
-  }
-
-  /**
-   * initialize
-   */
-  public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException {
-    this.writer.initiaize(file, entryCount);
-  }
-
-  /**
-   * finish
-   */
-  public void finish() {
-    this.writer.finish();
-  }
-
-  /**
-   * Below method will be used to write the sort temp file chunk by chunk
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
-    int recordCount = 0;
-    Object[][] tempRecords;
-    while (recordCount < records.length) {
-      if (records.length - recordCount < recordPerLeaf) {
-        recordPerLeaf = records.length - recordCount;
-      }
-      tempRecords = new Object[recordPerLeaf][];
-      System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf);
-      recordCount += recordPerLeaf;
-      this.writer.writeSortTempFile(tempRecords);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
deleted file mode 100644
index 2bf657e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-public interface TempSortFileReader {
-  /**
-   * below method will be used to close the file holder
-   */
-  void finish();
-
-  /**
-   * Below method will be used to get the row
-   */
-  Object[][] getRow();
-
-  /**
-   * Below method will be used to get the total row count in temp file
-   *
-   * @return
-   */
-  int getEntryCount();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
deleted file mode 100644
index 6679c8e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.File;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public interface TempSortFileWriter {
-  /**
-   * Method will be used to initialize
-   *
-   * @param file
-   * @param entryCount
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException;
-
-  /**
-   * Method will be used to finish
-   */
-  void finish();
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
deleted file mode 100644
index c0e8c6e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/TempSortFileWriterFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-public final class TempSortFileWriterFactory {
-  private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory();
-
-  private TempSortFileWriterFactory() {
-
-  }
-
-  public static TempSortFileWriterFactory getInstance() {
-    return WRITERFACTORY;
-  }
-
-  public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount,
-      int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize) {
-    if (isCompressionEnabled) {
-      return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
-          noDictionaryCount, writeBufferSize);
-    } else {
-      return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount,
-          noDictionaryCount, writeBufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
deleted file mode 100644
index 51b3964..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sortandgroupby.sortdata;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
-
-  /**
-   * UnCompressedTempSortFileWriter
-   *
-   * @param writeBufferSize
-   * @param dimensionCount
-   * @param measureCount
-   */
-  public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
-      int measureCount, int noDictionaryCount, int writeBufferSize) {
-    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
-  }
-
-  public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream,
-      int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount)
-      throws IOException {
-    Object[] row;
-    for (int recordIndex = 0; recordIndex < records.length; recordIndex++) {
-      row = records[recordIndex];
-      int fieldIndex = 0;
-
-      for (int counter = 0; counter < dimensionCount; counter++) {
-        dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row));
-      }
-
-      //write byte[] of high card dims
-      if (noDictionaryCount > 0) {
-        dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row));
-      }
-      fieldIndex = 0;
-      for (int counter = 0; counter < complexDimensionCount; counter++) {
-        int complexByteArrayLength = ((byte[]) row[fieldIndex]).length;
-        dataOutputStream.writeInt(complexByteArrayLength);
-        dataOutputStream.write(((byte[]) row[fieldIndex++]));
-      }
-
-      for (int counter = 0; counter < measureCount; counter++) {
-        if (null != row[fieldIndex]) {
-          dataOutputStream.write((byte) 1);
-          dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
-        } else {
-          dataOutputStream.write((byte) 0);
-        }
-
-        fieldIndex++;
-      }
-
-    }
-  }
-
-  /**
-   * Below method will be used to write the sort temp file
-   *
-   * @param records
-   */
-  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
-    ByteArrayOutputStream blockDataArray = null;
-    DataOutputStream dataOutputStream = null;
-    int totalSize = 0;
-    int recordSize = 0;
-    try {
-      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
-          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      totalSize = records.length * recordSize;
-
-      blockDataArray = new ByteArrayOutputStream(totalSize);
-      dataOutputStream = new DataOutputStream(blockDataArray);
-
-      writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
-          noDictionaryCount, complexDimensionCount);
-      stream.writeInt(records.length);
-      byte[] byteArray = blockDataArray.toByteArray();
-      stream.writeInt(byteArray.length);
-      stream.write(byteArray);
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    } finally {
-      CarbonUtil.closeStreams(blockDataArray);
-      CarbonUtil.closeStreams(dataOutputStream);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
deleted file mode 100644
index 39d1234..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-public abstract class AbstractCarbonQueryExecutor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
-  protected CarbonTable carbonTable;
-  protected QueryModel queryModel;
-  protected QueryExecutor queryExecutor;
-  protected Map<String, TaskBlockInfo> segmentMapping;
-
-  /**
-   * get executor and execute the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
-      throws QueryExecutionException, IOException {
-    queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-    return queryExecutor.execute(queryModel);
-  }
-
-  /**
-   * Preparing of the query model.
-   *
-   * @param blockList
-   * @return
-   */
-  protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
-    QueryModel model = new QueryModel();
-    model.setTableBlockInfos(blockList);
-    model.setForcedDetailRawQuery(true);
-    model.setFilterExpressionResolverTree(null);
-
-    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
-    for (CarbonDimension dim : dimensions) {
-      // check if dimension is deleted
-      QueryDimension queryDimension = new QueryDimension(dim.getColName());
-      queryDimension.setDimension(dim);
-      dims.add(queryDimension);
-    }
-    model.setQueryDimension(dims);
-
-    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
-    for (CarbonMeasure carbonMeasure : measures) {
-      // check if measure is deleted
-      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
-      queryMeasure.setMeasure(carbonMeasure);
-      msrs.add(queryMeasure);
-    }
-    model.setQueryMeasures(msrs);
-    model.setQueryId(System.nanoTime() + "");
-    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
-    model.setTable(carbonTable);
-    return model;
-  }
-
-  /**
-   * Below method will be used
-   * for cleanup
-   */
-  public void finish() {
-    try {
-      queryExecutor.finish();
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e, "Problem while finish: ");
-    }
-    clearDictionaryFromQueryModel();
-  }
-
-  /**
-   * This method will clear the dictionary access count after its usage is complete so
-   * that column can be deleted form LRU cache whenever memory reaches threshold
-   */
-  private void clearDictionaryFromQueryModel() {
-    if (null != queryModel) {
-      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
-      if (null != columnToDictionaryMapping) {
-        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
-          CarbonUtil.clearDictionaryCache(entry.getValue());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
deleted file mode 100644
index 7b724ee..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
-
-/**
- * Used to read carbon blocks when add/split partition
- */
-public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName());
-
-  public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) {
-    this.segmentMapping = segmentMapping;
-    this.carbonTable = carbonTable;
-  }
-
-  public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
-      throws QueryExecutionException, IOException {
-    List<TableBlockInfo> list = null;
-    queryModel = prepareQueryModel(list);
-    List<PartitionSpliterRawResultIterator> resultList
-        = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
-    Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
-    for (String task : taskBlockListMapping) {
-      list = taskBlockInfo.getTableBlockInfoList(task);
-      LOGGER.info("for task -" + task + "-block size is -" + list.size());
-      queryModel.setTableBlockInfos(list);
-      resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
-    }
-    return resultList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
deleted file mode 100644
index 9316c9f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.spliter;
-
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class RowResultProcessor {
-
-  private CarbonFactHandler dataHandler;
-  private SegmentProperties segmentProperties;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RowResultProcessor.class.getName());
-
-
-  public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
-      SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
-    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
-    this.segmentProperties = segProp;
-    String tableName = carbonTable.getFactTableName();
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
-        CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
-            segProp, tableName, tempStoreLocation);
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
-            loadModel.getFactTimeStamp());
-    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    carbonFactDataHandlerModel.setBucketId(bucketId);
-    //Note: set compaction flow just to convert decimal type
-    carbonFactDataHandlerModel.setCompactionFlow(true);
-    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-  }
-
-  public boolean execute(List<Object[]> resultList) {
-    boolean processStatus;
-    boolean isDataPresent = false;
-
-    try {
-      if (!isDataPresent) {
-        dataHandler.initialise();
-        isDataPresent = true;
-      }
-      for (Object[] row: resultList) {
-        addRow(row);
-      }
-      if (isDataPresent)
-      {
-        this.dataHandler.finish();
-      }
-      processStatus = true;
-    } catch (AlterPartitionSliceException e) {
-      LOGGER.error(e, e.getMessage());
-      LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
-      processStatus = false;
-    } finally {
-      try {
-        if (isDataPresent) {
-          this.dataHandler.closeHandler();
-        }
-      } catch (Exception e) {
-        LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
-        processStatus = false;
-      }
-    }
-    return processStatus;
-  }
-
-  private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
-    try {
-      this.dataHandler.addDataToStore(row);
-    } catch (CarbonDataWriterException e) {
-      throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
deleted file mode 100644
index 0e53a1f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter.exception;
-
-import java.util.Locale;
-
-public class AlterPartitionSliceException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public AlterPartitionSliceException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public AlterPartitionSliceException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java b/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
new file mode 100644
index 0000000..c7d5dd8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.splits;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.partition.Partition;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * It represents one region server as one split.
+ */
+public class TableSplit implements Serializable, Writable {
+  private static final long serialVersionUID = -8058151330863145575L;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TableSplit.class.getName());
+  private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+  private Partition partition;
+
+  /**
+   * @return the locations
+   */
+  public List<String> getLocations() {
+    return locations;
+  }
+
+  /**
+   * @param locations the locations to set
+   */
+  public void setLocations(List<String> locations) {
+    this.locations = locations;
+  }
+
+  /**
+   * @return Returns the partitions.
+   */
+  public Partition getPartition() {
+    return partition;
+  }
+
+  /**
+   * @param partition The partitions to set.
+   */
+  public void setPartition(Partition partition) {
+    this.partition = partition;
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+
+    int sizeLoc = in.readInt();
+    for (int i = 0; i < sizeLoc; i++) {
+      byte[] b = new byte[in.readInt()];
+      in.readFully(b);
+      locations.add(new String(b, Charset.defaultCharset()));
+    }
+
+    byte[] buf = new byte[in.readInt()];
+    in.readFully(buf);
+    ByteArrayInputStream bis = new ByteArrayInputStream(buf);
+    ObjectInputStream ois = new ObjectInputStream(bis);
+    try {
+      partition = (Partition) ois.readObject();
+    } catch (ClassNotFoundException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    ois.close();
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+
+    int sizeLoc = locations.size();
+    out.writeInt(sizeLoc);
+    for (int i = 0; i < sizeLoc; i++) {
+      byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+    ObjectOutputStream obs = new ObjectOutputStream(bos);
+    obs.writeObject(partition);
+    obs.close();
+    byte[] byteArray = bos.toByteArray();
+    out.writeInt(byteArray.length);
+    out.write(byteArray);
+  }
+
+  public String toString() {
+    return partition.getUniqueID() + ' ' + locations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
index 0b606b0..b69815e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
@@ -17,20 +17,11 @@
 
 package org.apache.carbondata.processing.store;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
 /**
  * This class contains attributes of file which are required to
  * construct file name like taskId, factTimeStamp
  */
 public class CarbonDataFileAttributes {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonDataFileAttributes.class.getName());
   /**
    * task Id which is unique for each spark task
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 0fe922d..7a5cc11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImpl
 /**
  * Factory class to get the writer instance
  */
-public class CarbonDataWriterFactory {
+class CarbonDataWriterFactory {
 
   /**
    * static instance
@@ -56,7 +56,7 @@ public class CarbonDataWriterFactory {
    * @param carbonDataWriterVo writer vo object
    * @return writer instance
    */
-  public CarbonFactDataWriter<?> getFactDataWriter(final ColumnarFormatVersion version,
+  public CarbonFactDataWriter getFactDataWriter(final ColumnarFormatVersion version,
       final CarbonDataWriterVo carbonDataWriterVo) {
     switch (version) {
       case V1:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index c4a5fc5..2c275bf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -88,10 +88,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * once this size of input is reached
    */
   private int pageSize;
-  /**
-   * keyBlockHolder
-   */
-  private CarbonKeyBlockHolder[] keyBlockHolder;
 
   // This variable is true if it is dictionary dimension and its cardinality is lower than
   // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
@@ -455,7 +451,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       this.dataWriter.closeWriter();
     }
     this.dataWriter = null;
-    this.keyBlockHolder = null;
   }
 
   /**
@@ -488,15 +483,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       //than below splitter will return column as {0,1,2}{3}{4}{5}
       ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
       System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
-      this.keyBlockHolder =
-          new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length];
-    } else {
-      this.keyBlockHolder = new CarbonKeyBlockHolder[0];
-    }
-
-    for (int i = 0; i < keyBlockHolder.length; i++) {
-      this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize);
-      this.keyBlockHolder[i].resetCounter();
     }
 
     // agg type
@@ -567,7 +553,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    *
    * @return data writer instance
    */
-  private CarbonFactDataWriter<?> getFactDataWriter() {
+  private CarbonFactDataWriter getFactDataWriter() {
     return CarbonDataWriterFactory.getInstance()
         .getFactDataWriter(version, getDataWriterVo());
   }
@@ -680,10 +666,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       return tablePage;
     }
 
-    /**
-     * @param encodedTablePage
-     * @param index
-     */
     public synchronized void put(TablePage tablePage, int index) {
       tablePages[index] = tablePage;
       // notify the consumer thread when index at which object is to be inserted

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 544a26a..2c346b2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -40,10 +40,10 @@ import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 // This class contains all the data required for processing and writing the carbon data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
deleted file mode 100644
index 898917b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonKeyBlockHolder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-public class CarbonKeyBlockHolder {
-  private byte[][] keyBlock;
-
-  private int counter;
-
-  public CarbonKeyBlockHolder(int size) {
-    keyBlock = new byte[size][];
-  }
-
-  public void addRowToBlock(int index, byte[] keyBlock) {
-    this.keyBlock[index] = keyBlock;
-    counter++;
-  }
-
-  public byte[][] getKeyBlock() {
-    if (counter < keyBlock.length) {
-      byte[][] temp = new byte[counter][];
-      System.arraycopy(keyBlock, 0, temp, 0, counter);
-      return temp;
-    }
-    return keyBlock;
-  }
-
-  public void resetCounter() {
-    counter = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
deleted file mode 100644
index 48227d1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SingleThreadFinalSortFilesMerger.class.getName());
-
-  /**
-   * lockObject
-   */
-  private static final Object LOCKOBJECT = new Object();
-
-  /**
-   * fileCounter
-   */
-  private int fileCounter;
-
-  /**
-   * fileBufferSize
-   */
-  private int fileBufferSize;
-
-  /**
-   * recordHolderHeap
-   */
-  private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal;
-
-  /**
-   * tableName
-   */
-  private String tableName;
-
-  /**
-   * measureCount
-   */
-  private int measureCount;
-
-  /**
-   * dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * measure count
-   */
-  private int noDictionaryCount;
-
-  /**
-   * complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * tempFileLocation
-   */
-  private String[] tempFileLocation;
-
-  private DataType[] measureDataType;
-
-  /**
-   * below code is to check whether dimension
-   * is of no dictionary type or not
-   */
-  private boolean[] isNoDictionaryColumn;
-
-  private boolean[] isNoDictionarySortColumn;
-
-  public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
-      int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
-    this.tempFileLocation = tempFileLocation;
-    this.tableName = tableName;
-    this.dimensionCount = dimensionCount;
-    this.complexDimensionCount = complexDimensionCount;
-    this.measureCount = measureCount;
-    this.measureDataType = type;
-    this.noDictionaryCount = noDictionaryCount;
-    this.isNoDictionaryColumn = isNoDictionaryColumn;
-    this.isNoDictionarySortColumn = isNoDictionarySortColumn;
-  }
-
-  /**
-   * This method will be used to merger the merged files
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  public void startFinalMerge() throws CarbonDataWriterException {
-    List<File> filesToMerge = getFilesToMergeSort();
-    if (filesToMerge.size() == 0)
-    {
-      LOGGER.info("No file to merge in final merge stage");
-      return;
-    }
-
-    startSorting(filesToMerge);
-  }
-
-  private List<File> getFilesToMergeSort() {
-    FileFilter fileFilter = new FileFilter() {
-      public boolean accept(File pathname) {
-        return pathname.getName().startsWith(tableName);
-      }
-    };
-
-    // get all the merged files
-    List<File> files = new ArrayList<File>(tempFileLocation.length);
-    for (String tempLoc : tempFileLocation)
-    {
-      File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0)
-      {
-        files.addAll(Arrays.asList(subFiles));
-      }
-    }
-
-    return files;
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startSorting(List<File> files) throws CarbonDataWriterException {
-    this.fileCounter = files.size();
-    if (fileCounter == 0) {
-      LOGGER.info("No files to merge sort");
-      return;
-    }
-    this.fileBufferSize = CarbonDataProcessorUtil
-        .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
-            CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    LOGGER.info("Number of temp file: " + this.fileCounter);
-
-    LOGGER.info("File Buffer Size: " + this.fileBufferSize);
-
-    // create record holder heap
-    createRecordHolderQueue();
-
-    // iterate over file list and create chunk holder and add to heap
-    LOGGER.info("Started adding first record from each file");
-    int maxThreadForSorting = 0;
-    try {
-      maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
-              CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
-    } catch (NumberFormatException e) {
-      maxThreadForSorting =
-          Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
-    }
-    ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
-
-    for (final File tempFile : files) {
-
-      Runnable runnable = new Runnable() {
-        @Override public void run() {
-
-            // create chunk holder
-            SortTempFileChunkHolder sortTempFileChunkHolder =
-                new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                    measureCount, fileBufferSize, noDictionaryCount, measureDataType,
-                    isNoDictionaryColumn, isNoDictionarySortColumn);
-          try {
-            // initialize
-            sortTempFileChunkHolder.initialize();
-            sortTempFileChunkHolder.readRow();
-          } catch (CarbonSortKeyAndGroupByException ex) {
-            LOGGER.error(ex);
-          }
-
-          synchronized (LOCKOBJECT) {
-            recordHolderHeapLocal.add(sortTempFileChunkHolder);
-          }
-        }
-      };
-      service.execute(runnable);
-    }
-    service.shutdown();
-
-    try {
-      service.awaitTermination(2, TimeUnit.HOURS);
-    } catch (Exception e) {
-      throw new CarbonDataWriterException(e.getMessage(), e);
-    }
-
-    LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
-  }
-
-  /**
-   * This method will be used to create the heap which will be used to hold
-   * the chunk of data
-   */
-  private void createRecordHolderQueue() {
-    // creating record holder heap
-    this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
-  }
-
-  /**
-   * This method will be used to get the sorted row
-   *
-   * @return sorted row
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  public Object[] next() {
-    return getSortedRecordFromFile();
-  }
-
-  /**
-   * This method will be used to get the sorted record from file
-   *
-   * @return sorted record sorted record
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
-    Object[] row = null;
-
-    // poll the top object from heap
-    // heap maintains binary tree which is based on heap condition that will
-    // be based on comparator we are passing the heap
-    // when will call poll it will always delete root of the tree and then
-    // it does trickel down operation complexity is log(n)
-    SortTempFileChunkHolder poll = this.recordHolderHeapLocal.poll();
-
-    // get the row from chunk
-    row = poll.getRow();
-
-    // check if there no entry present
-    if (!poll.hasNext()) {
-      // if chunk is empty then close the stream
-      poll.closeStream();
-
-      // change the file counter
-      --this.fileCounter;
-
-      // reaturn row
-      return row;
-    }
-
-    // read new row
-    try {
-      poll.readRow();
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataWriterException(e.getMessage(), e);
-    }
-
-    // add to heap
-    this.recordHolderHeapLocal.add(poll);
-
-    // return row
-    return row;
-  }
-
-  /**
-   * This method will be used to check whether any more element is present or
-   * not
-   *
-   * @return more element is present
-   */
-  public boolean hasNext() {
-    return this.fileCounter > 0;
-  }
-
-  public void clear() {
-    if (null != recordHolderHeapLocal) {
-      recordHolderHeapLocal = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
deleted file mode 100644
index a2e22c2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupDataHolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-
-/**
- * This will hold column group data.
- */
-public class ColGroupDataHolder implements DataHolder {
-
-  private int noOfRecords;
-
-  /**
-   * colGrpData[row no][data]
-   */
-  private byte[][] colGrpData;
-
-  /**
-   * This will have min max value of each chunk
-   */
-  private ColGroupMinMax colGrpMinMax;
-
-  /**
-   * each row size of this column group block
-   */
-  private int keyBlockSize;
-
-  /**
-   * @param keyBlockSize
-   * @param noOfRecords
-   * @param colGrpMinMax
-   */
-  public ColGroupDataHolder(int keyBlockSize,
-       int noOfRecords,ColGroupMinMax colGrpMinMax) {
-    this.noOfRecords = noOfRecords;
-    this.keyBlockSize = keyBlockSize;
-    this.colGrpMinMax = colGrpMinMax;
-    colGrpData = new byte[noOfRecords][];
-  }
-
-  @Override public void addData(byte[] rowsData, int rowIndex) {
-    colGrpData[rowIndex] = rowsData;
-    colGrpMinMax.add(rowsData);
-  }
-
-  /**
-   * this will return min of each chunk
-   *
-   * @return
-   */
-  public byte[] getMin() {
-    return colGrpMinMax.getMin();
-  }
-
-  /**
-   * this will return max of each chunk
-   *
-   * @return
-   */
-  public byte[] getMax() {
-    return colGrpMinMax.getMax();
-  }
-
-  /**
-   * Return size of this column group block
-   *
-   * @return
-   */
-  public int getKeyBlockSize() {
-    return keyBlockSize;
-  }
-
-  @Override public byte[][] getData() {
-    return colGrpData;
-  }
-
-  /**
-   * return total size required by this block
-   *
-   * @return
-   */
-  public int getTotalSize() {
-    return noOfRecords * keyBlockSize;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
deleted file mode 100644
index b3d11f2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMax.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.util.ByteUtil;
-
-/**
- * it gives min max of each column of column group
- */
-public class ColGroupMinMax {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ColGroupMinMax.class.getName());
-
-  /**
-   * key generator
-   */
-  private KeyGenerator keyGenerator;
-
-  /**
-   * no of column in column group
-   */
-  private int noOfCol;
-
-  /**
-   * min value of each column
-   */
-  private byte[][] min;
-
-  /**
-   * max value of each column
-   */
-  private byte[][] max;
-
-  /**
-   * mask byte range
-   */
-  private int[][] maskByteRange;
-
-  /**
-   * max keys
-   */
-  private byte[][] maxKeys;
-
-  public ColGroupMinMax(SegmentProperties segmentProperties, int colGroupId) {
-    this.keyGenerator = segmentProperties.getColumnGroupAndItsKeygenartor().get(colGroupId);
-    this.noOfCol = segmentProperties.getNoOfColumnsInColumnGroup(colGroupId);
-    min = new byte[noOfCol][];
-    max = new byte[noOfCol][];
-    initialise();
-  }
-
-  /**
-   * @param mdkey
-   */
-  public void add(byte[] mdkey) {
-    for (int i = 0; i < noOfCol; i++) {
-      byte[] col = getMaskedKey(mdkey, maskByteRange[i], maxKeys[i]);
-      setMin(col, i);
-      setMax(col, i);
-    }
-  }
-
-  /**
-   * Below method will be used to get the masked key
-   *
-   * @param data
-   * @return maskedKey
-   */
-  private byte[] getMaskedKey(byte[] data, int[] maskByteRange, byte[] maxKey) {
-    int keySize = maskByteRange.length;
-    byte[] maskedKey = new byte[keySize];
-    int counter = 0;
-    int byteRange = 0;
-    for (int i = 0; i < keySize; i++) {
-      byteRange = maskByteRange[i];
-      maskedKey[counter++] = (byte) (data[byteRange] & maxKey[byteRange]);
-    }
-    return maskedKey;
-  }
-
-  /**
-   * intitialising data required for min max calculation
-   */
-  private void initialise() {
-    try {
-      maskByteRange = new int[noOfCol][];
-      maxKeys = new byte[noOfCol][];
-      for (int i = 0; i < noOfCol; i++) {
-        maskByteRange[i] = getMaskByteRange(i);
-        // generating maxkey
-        long[] maxKey = new long[noOfCol];
-        maxKey[i] = Long.MAX_VALUE;
-        maxKeys[i] = keyGenerator.generateKey(maxKey);
-      }
-    } catch (KeyGenException e) {
-      LOGGER.error(e, "Key generation failed while evaulating column group min max");
-    }
-
-  }
-
-  /**
-   * get range for given column in generated md key
-   *
-   * @param col : column
-   * @return maskByteRange
-   */
-  private int[] getMaskByteRange(int col) {
-    Set<Integer> integers = new HashSet<>();
-    int[] range = keyGenerator.getKeyByteOffsets(col);
-    for (int j = range[0]; j <= range[1]; j++) {
-      integers.add(j);
-    }
-    int[] byteIndexs = new int[integers.size()];
-    int j = 0;
-    for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext(); ) {
-      Integer integer = (Integer) iterator.next();
-      byteIndexs[j++] = integer.intValue();
-    }
-    return byteIndexs;
-  }
-
-  /**
-   * set min value of given column
-   *
-   * @param colData
-   * @param column
-   */
-  private void setMin(byte[] colData, int column) {
-
-    if (null == min[column]) {
-      min[column] = colData;
-    } else {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(colData, min[column]) < 0) {
-        min[column] = colData;
-      }
-    }
-  }
-
-  /**
-   * set max value of given column
-   *
-   * @param colData
-   * @param column
-   */
-  private void setMax(byte[] colData, int column) {
-    if (null == max[column]) {
-      max[column] = colData;
-    } else {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(colData, max[column]) > 0) {
-        max[column] = colData;
-      }
-
-    }
-  }
-
-  /**
-   * Get min value of block
-   *
-   * @return min value of block
-   */
-  public byte[] getMin() {
-    int size = 0;
-    for (int i = 0; i < noOfCol; i++) {
-      size += min[i].length;
-    }
-    ByteBuffer bb = ByteBuffer.allocate(size);
-    for (int i = 0; i < noOfCol; i++) {
-      bb.put(min[i]);
-    }
-    return bb.array();
-  }
-
-  /**
-   * get max value of block
-   *
-   * @return max value of block
-   */
-  public byte[] getMax() {
-    int size = 0;
-    for (int i = 0; i < noOfCol; i++) {
-      size += max[i].length;
-    }
-    ByteBuffer bb = ByteBuffer.allocate(size);
-    for (int i = 0; i < noOfCol; i++) {
-      bb.put(max[i]);
-    }
-    return bb.array();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
deleted file mode 100644
index 8caf339..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColumnDataHolder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-/**
- * Store mdkey data for each column block
- */
-public class ColumnDataHolder implements DataHolder {
-
-  private byte[][] data;
-
-  public ColumnDataHolder(int noOfRow) {
-    data = new byte[noOfRow][];
-  }
-
-  @Override public void addData(byte[] rowRecord, int rowIndex) {
-    data[rowIndex] = rowRecord;
-  }
-
-  @Override public byte[][] getData() {
-    return data;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
deleted file mode 100644
index 3b1ee81..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/DataHolder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.colgroup;
-
-/**
- * Hold complete data for a leaf node
- */
-public interface DataHolder {
-
-  /**
-   * add row to holder
-   *
-   * @param rowRecord: row data
-   * @param rowIndex   : row number
-   */
-  public void addData(byte[] rowRecord, int rowIndex);
-
-  /**
-   * return the data when required
-   *
-   * @return all data
-   */
-  public byte[][] getData();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index ec42596..acb3b3b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -64,7 +64,7 @@ import org.apache.carbondata.processing.store.file.FileData;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.IOUtils;
 
-public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T> {
+public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index e195d10..3a2fa1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.store.TablePage;
 
-public interface CarbonFactDataWriter<T> {
+public interface CarbonFactDataWriter {
 
   /**
    * write a encoded table page