You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:24 UTC

[08/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
deleted file mode 100644
index 20d9894..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-
-public class UnsafeInmemoryHolder implements SortTempChunkHolder {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName());
-
-  private int counter;
-
-  private int actualSize;
-
-  private UnsafeCarbonRowPage rowPage;
-
-  private Object[] currentRow;
-
-  private long address;
-
-  private NewRowComparator comparator;
-
-  private int columnSize;
-
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
-      int numberOfSortColumns) {
-    this.actualSize = rowPage.getBuffer().getActualSize();
-    this.rowPage = rowPage;
-    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
-    this.columnSize = columnSize;
-  }
-
-  public boolean hasNext() {
-    if (counter < actualSize) {
-      return true;
-    }
-    return false;
-  }
-
-  public void readRow() {
-    currentRow = new Object[columnSize];
-    address = rowPage.getBuffer().get(counter);
-    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
-    counter++;
-  }
-
-  public Object[] getRow() {
-    return currentRow;
-  }
-
-  @Override public int compareTo(SortTempChunkHolder o) {
-    return comparator.compare(currentRow, o.getRow());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof UnsafeInmemoryHolder)) {
-      return false;
-    }
-
-    UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj;
-
-    return this == o;
-  }
-
-  @Override public int hashCode() {
-    return super.hashCode();
-  }
-
-  public int numberOfRows() {
-    return actualSize;
-  }
-
-  public void close() {
-    rowPage.freeMemory();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
deleted file mode 100644
index fa4534f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
-
-/**
- * It is used for merging unsafe inmemory intermediate data
- */
-public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName());
-
-  private int counter;
-
-  private int actualSize;
-
-  private UnsafeCarbonRowPage rowPage;
-
-  private UnsafeCarbonRowForMerge currentRow;
-
-  private long address;
-
-  private UnsafeRowComparator comparator;
-
-  private Object baseObject;
-
-  private byte index;
-
-  public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) {
-    this.actualSize = rowPage.getBuffer().getActualSize();
-    this.rowPage = rowPage;
-    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new UnsafeRowComparator(rowPage);
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-    currentRow = new UnsafeCarbonRowForMerge();
-    this.index = index;
-  }
-
-  public boolean hasNext() {
-    if (counter < actualSize) {
-      return true;
-    }
-    return false;
-  }
-
-  public void readRow() {
-    address = rowPage.getBuffer().get(counter);
-    currentRow = new UnsafeCarbonRowForMerge();
-    currentRow.address = address + rowPage.getDataBlock().getBaseOffset();
-    currentRow.index = index;
-    counter++;
-  }
-
-  public UnsafeCarbonRowForMerge getRow() {
-    return currentRow;
-  }
-
-  @Override public int compareTo(UnsafeInmemoryMergeHolder o) {
-    return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof UnsafeInmemoryMergeHolder)) {
-      return false;
-    }
-
-    UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj;
-    return this == o;
-  }
-
-  @Override public int hashCode() {
-    return super.hashCode();
-  }
-
-  public Object getBaseObject() {
-    return baseObject;
-  }
-
-  public void close() {
-    rowPage.freeMemory();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
deleted file mode 100644
index f5316e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName());
-
-  /**
-   * temp file
-   */
-  private File tempFile;
-
-  /**
-   * read stream
-   */
-  private DataInputStream stream;
-
-  /**
-   * entry count
-   */
-  private int entryCount;
-
-  /**
-   * return row
-   */
-  private Object[] returnRow;
-
-  /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int complexDimensionCount;
-
-  /**
-   * fileBufferSize for file reader stream size
-   */
-  private int fileBufferSize;
-
-  private Object[][] currentBuffer;
-
-  private Object[][] backupBuffer;
-
-  private boolean isBackupFilled;
-
-  private boolean prefetch;
-
-  private int bufferSize;
-
-  private int bufferRowCounter;
-
-  private ExecutorService executorService;
-
-  private Future<Void> submit;
-
-  private int prefetchRecordsProceesed;
-
-  /**
-   * sortTempFileNoOFRecordsInCompression
-   */
-  private int sortTempFileNoOFRecordsInCompression;
-
-  /**
-   * isSortTempFileCompressionEnabled
-   */
-  private boolean isSortTempFileCompressionEnabled;
-
-  /**
-   * totalRecordFetch
-   */
-  private int totalRecordFetch;
-
-  private int noDictionaryCount;
-
-  private DataType[] measureDataType;
-
-  private int numberOfObjectRead;
-  /**
-   * to store whether dimension is of dictionary type or not
-   */
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  private int nullSetWordsLength;
-
-  private Comparator<Object[]> comparator;
-
-  /**
-   * Constructor to initialize
-   */
-  public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
-    // set temp file
-    this.tempFile = tempFile;
-
-    // set measure and dimension count
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    // set mdkey length
-    this.fileBufferSize = parameters.getFileBufferSize();
-    this.executorService = Executors.newFixedThreadPool(1);
-    this.measureDataType = parameters.getMeasureDataType();
-    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
-    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
-    initialize();
-  }
-
-  /**
-   * This method will be used to initialize
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while initializing
-   */
-  public void initialize() {
-    prefetch = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH,
-            CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT));
-    bufferSize = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
-            CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT));
-    this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED,
-            CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE));
-    if (this.isSortTempFileCompressionEnabled) {
-      LOGGER.info("Compression was used while writing the sortTempFile");
-    }
-
-    try {
-      this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION,
-              CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE));
-      if (this.sortTempFileNoOFRecordsInCompression < 1) {
-        LOGGER.error("Invalid value for: "
-            + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-            + ": Only Positive Integer value(greater than zero) is allowed.Default value will"
-            + " be used");
-
-        this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(
-            CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.error(
-          "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
-              + ", only Positive Integer value is allowed.Default value will be used");
-      this.sortTempFileNoOFRecordsInCompression = Integer
-          .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE);
-    }
-
-    initialise();
-  }
-
-  private void initialise() {
-    try {
-      if (isSortTempFileCompressionEnabled) {
-        this.bufferSize = sortTempFileNoOFRecordsInCompression;
-      }
-      stream = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize));
-      this.entryCount = stream.readInt();
-      LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount);
-      if (prefetch) {
-        new DataFetcher(false).call();
-        totalRecordFetch += currentBuffer.length;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      } else {
-        if (isSortTempFileCompressionEnabled) {
-          new DataFetcher(false).call();
-        }
-      }
-
-    } catch (FileNotFoundException e) {
-      LOGGER.error(e);
-      throw new RuntimeException(tempFile + " No Found", e);
-    } catch (IOException e) {
-      LOGGER.error(e);
-      throw new RuntimeException(tempFile + " No Found", e);
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new RuntimeException(tempFile + " Problem while reading", e);
-    }
-  }
-
-  /**
-   * This method will be used to read new row from file
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while reading
-   */
-  public void readRow() throws CarbonSortKeyAndGroupByException {
-    if (prefetch) {
-      fillDataForPrefetch();
-    } else if (isSortTempFileCompressionEnabled) {
-      if (bufferRowCounter >= bufferSize) {
-        try {
-          new DataFetcher(false).call();
-          bufferRowCounter = 0;
-        } catch (Exception e) {
-          LOGGER.error(e);
-          throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e);
-        }
-
-      }
-      prefetchRecordsProceesed++;
-      returnRow = currentBuffer[bufferRowCounter++];
-    } else {
-      this.returnRow = getRowFromStream();
-    }
-  }
-
-  private void fillDataForPrefetch() {
-    if (bufferRowCounter >= bufferSize) {
-      if (isBackupFilled) {
-        bufferRowCounter = 0;
-        currentBuffer = backupBuffer;
-        totalRecordFetch += currentBuffer.length;
-        isBackupFilled = false;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      } else {
-        try {
-          submit.get();
-        } catch (Exception e) {
-          LOGGER.error(e);
-        }
-        bufferRowCounter = 0;
-        currentBuffer = backupBuffer;
-        isBackupFilled = false;
-        totalRecordFetch += currentBuffer.length;
-        if (totalRecordFetch < this.entryCount) {
-          submit = executorService.submit(new DataFetcher(true));
-        }
-      }
-    }
-    prefetchRecordsProceesed++;
-    returnRow = currentBuffer[bufferRowCounter++];
-  }
-
-  /**
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    Object[] row = new Object[dimensionCount + measureCount];
-    try {
-      int dimCount = 0;
-      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-        if (isNoDictionaryDimensionColumn[dimCount]) {
-          short aShort = stream.readShort();
-          byte[] col = new byte[aShort];
-          stream.readFully(col);
-          row[dimCount] = col;
-        } else {
-          int anInt = stream.readInt();
-          row[dimCount] = anInt;
-        }
-      }
-
-      // write complex dimensions here.
-      for (; dimCount < dimensionCount; dimCount++) {
-        short aShort = stream.readShort();
-        byte[] col = new byte[aShort];
-        stream.readFully(col);
-        row[dimCount] = col;
-      }
-
-      long[] words = new long[nullSetWordsLength];
-      for (int i = 0; i < words.length; i++) {
-        words[i] = stream.readLong();
-      }
-
-      for (int mesCount = 0; mesCount < measureCount; mesCount++) {
-        if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          switch (measureDataType[mesCount]) {
-            case SHORT:
-              row[dimensionCount + mesCount] = stream.readShort();
-              break;
-            case INT:
-              row[dimensionCount + mesCount] = stream.readInt();
-              break;
-            case LONG:
-              row[dimensionCount + mesCount] = stream.readLong();
-              break;
-            case DOUBLE:
-              row[dimensionCount + mesCount] = stream.readDouble();
-              break;
-            case DECIMAL:
-              short aShort = stream.readShort();
-              byte[] bigDecimalInBytes = new byte[aShort];
-              stream.readFully(bigDecimalInBytes);
-              row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" +
-                  measureDataType[mesCount]);
-          }
-        }
-      }
-      return row;
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
-    }
-  }
-
-  /**
-   * below method will be used to get the row
-   *
-   * @return row
-   */
-  public Object[] getRow() {
-    return this.returnRow;
-  }
-
-  /**
-   * below method will be used to check whether any more records are present
-   * in file or not
-   *
-   * @return more row present in file
-   */
-  public boolean hasNext() {
-    if (prefetch || isSortTempFileCompressionEnabled) {
-      return this.prefetchRecordsProceesed < this.entryCount;
-    }
-    return this.numberOfObjectRead < this.entryCount;
-  }
-
-  /**
-   * Below method will be used to close streams
-   */
-  public void close() {
-    CarbonUtil.closeStreams(stream);
-    executorService.shutdown();
-  }
-
-  /**
-   * This method will number of entries
-   *
-   * @return entryCount
-   */
-  public int numberOfRows() {
-    return entryCount;
-  }
-
-  @Override public int compareTo(SortTempChunkHolder other) {
-    return comparator.compare(returnRow, other.getRow());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
-      return false;
-    }
-    UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
-
-    return this == o;
-  }
-
-  @Override public int hashCode() {
-    int hash = 0;
-    hash += 31 * measureCount;
-    hash += 31 * dimensionCount;
-    hash += 31 * complexDimensionCount;
-    hash += 31 * noDictionaryCount;
-    hash += tempFile.hashCode();
-    return hash;
-  }
-
-  private final class DataFetcher implements Callable<Void> {
-    private boolean isBackUpFilling;
-
-    private int numberOfRecords;
-
-    private DataFetcher(boolean backUp) {
-      isBackUpFilling = backUp;
-      calculateNumberOfRecordsToBeFetched();
-    }
-
-    private void calculateNumberOfRecordsToBeFetched() {
-      int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch;
-      numberOfRecords =
-          bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead;
-    }
-
-    @Override public Void call() throws Exception {
-      try {
-        if (isBackUpFilling) {
-          backupBuffer = prefetchRecordsFromFile(numberOfRecords);
-          isBackupFilled = true;
-        } else {
-          currentBuffer = prefetchRecordsFromFile(numberOfRecords);
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-      }
-      return null;
-    }
-
-  }
-
-  /**
-   * This method will read the records from sort temp file and keep it in a buffer
-   *
-   * @param numberOfRecords
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
-      throws CarbonSortKeyAndGroupByException {
-    Object[][] records = new Object[numberOfRecords][];
-    for (int i = 0; i < numberOfRecords; i++) {
-      records[i] = getRowFromStream();
-    }
-    return records;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
deleted file mode 100644
index 5480838..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.util.AbstractQueue;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRowForMerge;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName());
-
-  /**
-   * recordHolderHeap
-   */
-  private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap;
-
-  /**
-   * fileCounter
-   */
-  private int holderCounter;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-
-  private UnsafeCarbonRowPage[] unsafeCarbonRowPages;
-
-  private long[] mergedAddresses;
-
-  private byte[] rowPageIndexes;
-
-  /**
-   * IntermediateFileMerger Constructor
-   */
-  public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages,
-      int totalSize) {
-    this.holderCounter = unsafeCarbonRowPages.length;
-    this.unsafeCarbonRowPages = unsafeCarbonRowPages;
-    this.mergedAddresses = new long[totalSize];
-    this.rowPageIndexes = new byte[totalSize];
-    this.entryCount = 0;
-  }
-
-  @Override
-  public void run() {
-    long intermediateMergeStartTime = System.currentTimeMillis();
-    int holderCounterConst = holderCounter;
-    try {
-      startSorting();
-      while (hasNext()) {
-        writeDataToMemory(next());
-      }
-      double intermediateMergeCostTime =
-          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
-      LOGGER.info("============================== Intermediate Merge of " + holderCounterConst
-          + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)");
-    } catch (Exception e) {
-      LOGGER.error(e, "Problem while intermediate merging");
-    }
-  }
-
-  /**
-   * This method will be used to get the sorted record from file
-   *
-   * @return sorted record sorted record
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private UnsafeCarbonRowForMerge getSortedRecordFromMemory()
-      throws CarbonSortKeyAndGroupByException {
-    UnsafeCarbonRowForMerge row = null;
-
-    // poll the top object from heap
-    // heap maintains binary tree which is based on heap condition that will
-    // be based on comparator we are passing the heap
-    // when will call poll it will always delete root of the tree and then
-    // it does trickel down operation complexity is log(n)
-    UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll();
-
-    // get the row from chunk
-    row = poll.getRow();
-
-    // check if there no entry present
-    if (!poll.hasNext()) {
-      // change the file counter
-      --this.holderCounter;
-
-      // reaturn row
-      return row;
-    }
-
-    // read new row
-    poll.readRow();
-
-    // add to heap
-    this.recordHolderHeap.add(poll);
-
-    // return row
-    return row;
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startSorting() throws CarbonSortKeyAndGroupByException {
-    LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter);
-
-    // create record holder heap
-    createRecordHolderQueue(unsafeCarbonRowPages);
-
-    // iterate over file list and create chunk holder and add to heap
-    LOGGER.info("Started adding first record from row page");
-
-    UnsafeInmemoryMergeHolder unsafePageHolder = null;
-    byte index = 0;
-    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
-      // create chunk holder
-      unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++);
-
-      // initialize
-      unsafePageHolder.readRow();
-
-      // add to heap
-      this.recordHolderHeap.add(unsafePageHolder);
-    }
-
-    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
-  }
-
-  /**
-   * This method will be used to create the heap which will be used to hold
-   * the chunk of data
-   */
-  private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) {
-    // creating record holder heap
-    this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length);
-  }
-
-  /**
-   * This method will be used to get the sorted row
-   *
-   * @return sorted row
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromMemory();
-  }
-
-  /**
-   * This method will be used to check whether any more element is present or
-   * not
-   *
-   * @return more element is present
-   */
-  private boolean hasNext() {
-    return this.holderCounter > 0;
-  }
-
-  /**
-   * Below method will be used to write data to file
-   */
-  private void writeDataToMemory(UnsafeCarbonRowForMerge row) {
-    mergedAddresses[entryCount] = row.address;
-    rowPageIndexes[entryCount] = row.index;
-    entryCount++;
-  }
-
-  public int getEntryCount() {
-    return entryCount;
-  }
-
-  public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() {
-    return unsafeCarbonRowPages;
-  }
-
-  public long[] getMergedAddresses() {
-    return mergedAddresses;
-  }
-
-  public byte[] getRowPageIndexes() {
-    return rowPageIndexes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
deleted file mode 100644
index 63f6aab..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.AbstractQueue;
-import java.util.Arrays;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory;
-
-public class UnsafeIntermediateFileMerger implements Runnable {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeIntermediateFileMerger.class.getName());
-
-  /**
-   * recordHolderHeap
-   */
-  private AbstractQueue<SortTempChunkHolder> recordHolderHeap;
-
-  /**
-   * fileCounter
-   */
-  private int fileCounter;
-
-  /**
-   * stream
-   */
-  private DataOutputStream stream;
-
-  /**
-   * totalNumberOfRecords
-   */
-  private int totalNumberOfRecords;
-
-  /**
-   * writer
-   */
-  private TempSortFileWriter writer;
-
-  private SortParameters mergerParameters;
-
-  private File[] intermediateFiles;
-
-  private File outPutFile;
-
-  private boolean[] noDictionarycolumnMapping;
-
-  private long[] nullSetWords;
-
-  private ByteBuffer rowData;
-
-  /**
-   * IntermediateFileMerger Constructor
-   */
-  public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
-      File outPutFile) {
-    this.mergerParameters = mergerParameters;
-    this.fileCounter = intermediateFiles.length;
-    this.intermediateFiles = intermediateFiles;
-    this.outPutFile = outPutFile;
-    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
-    this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) >> 6) + 1];
-    // Take size of 2 MB for each row. I think it is high enough to use
-    rowData = ByteBuffer.allocate(2 * 1024 * 1024);
-  }
-
-  @Override
-  public void run() {
-    long intermediateMergeStartTime = System.currentTimeMillis();
-    int fileConterConst = fileCounter;
-    boolean isFailed = false;
-    try {
-      startSorting();
-      initialize();
-      while (hasNext()) {
-        writeDataTofile(next());
-      }
-      double intermediateMergeCostTime =
-          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
-      LOGGER.info("============================== Intermediate Merge of " + fileConterConst
-          + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
-    } catch (Exception e) {
-      LOGGER.error(e, "Problem while intermediate merging");
-      isFailed = true;
-    } finally {
-      CarbonUtil.closeStreams(this.stream);
-      if (null != writer) {
-        writer.finish();
-      }
-      if (!isFailed) {
-        try {
-          finish();
-        } catch (CarbonSortKeyAndGroupByException e) {
-          LOGGER.error(e, "Problem while deleting the merge file");
-        }
-      } else {
-        if (outPutFile.delete()) {
-          LOGGER.error("Problem while deleting the merge file");
-        }
-      }
-    }
-  }
-
-  /**
-   * This method is responsible for initializing the out stream
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void initialize() throws CarbonSortKeyAndGroupByException {
-    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
-      try {
-        this.stream = new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(outPutFile),
-                mergerParameters.getFileWriteBufferSize()));
-        this.stream.writeInt(this.totalNumberOfRecords);
-      } catch (FileNotFoundException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
-      } catch (IOException e) {
-        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
-      }
-    } else {
-      writer = TempSortFileWriterFactory.getInstance()
-          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
-              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
-              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getFileWriteBufferSize());
-      writer.initiaize(outPutFile, totalNumberOfRecords);
-    }
-  }
-
-  /**
-   * This method will be used to get the sorted record from file
-   *
-   * @return sorted record sorted record
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
-    Object[] row = null;
-
-    // poll the top object from heap
-    // heap maintains binary tree which is based on heap condition that will
-    // be based on comparator we are passing the heap
-    // when will call poll it will always delete root of the tree and then
-    // it does trickel down operation complexity is log(n)
-    SortTempChunkHolder poll = this.recordHolderHeap.poll();
-
-    // get the row from chunk
-    row = poll.getRow();
-
-    // check if there no entry present
-    if (!poll.hasNext()) {
-      // if chunk is empty then close the stream
-      poll.close();
-
-      // change the file counter
-      --this.fileCounter;
-
-      // reaturn row
-      return row;
-    }
-
-    // read new row
-    poll.readRow();
-
-    // add to heap
-    this.recordHolderHeap.add(poll);
-
-    // return row
-    return row;
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private void startSorting() throws CarbonSortKeyAndGroupByException {
-    LOGGER.info("Number of temp file: " + this.fileCounter);
-
-    // create record holder heap
-    createRecordHolderQueue(intermediateFiles);
-
-    // iterate over file list and create chunk holder and add to heap
-    LOGGER.info("Started adding first record from each file");
-
-    SortTempChunkHolder sortTempFileChunkHolder = null;
-
-    for (File tempFile : intermediateFiles) {
-      // create chunk holder
-      sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters);
-
-      sortTempFileChunkHolder.readRow();
-      this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();
-
-      // add to heap
-      this.recordHolderHeap.add(sortTempFileChunkHolder);
-    }
-
-    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
-  }
-
-  /**
-   * This method will be used to create the heap which will be used to hold
-   * the chunk of data
-   *
-   * @param listFiles list of temp files
-   */
-  private void createRecordHolderQueue(File[] listFiles) {
-    // creating record holder heap
-    this.recordHolderHeap = new PriorityQueue<SortTempChunkHolder>(listFiles.length);
-  }
-
-  /**
-   * This method will be used to get the sorted row
-   *
-   * @return sorted row
-   * @throws CarbonSortKeyAndGroupByException
-   */
-  private Object[] next() throws CarbonSortKeyAndGroupByException {
-    return getSortedRecordFromFile();
-  }
-
-  /**
-   * This method will be used to check whether any more element is present or
-   * not
-   *
-   * @return more element is present
-   */
-  private boolean hasNext() {
-    return this.fileCounter > 0;
-  }
-
-  /**
-   * Below method will be used to write data to file
-   *
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
-    int dimCount = 0;
-    int size = 0;
-    DataType[] type = mergerParameters.getMeasureDataType();
-    for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
-      if (noDictionarycolumnMapping[dimCount]) {
-        byte[] col = (byte[]) row[dimCount];
-        rowData.putShort((short) col.length);
-        size += 2;
-        rowData.put(col);
-        size += col.length;
-      } else {
-        rowData.putInt((int) row[dimCount]);
-        size += 4;
-      }
-    }
-
-    // write complex dimensions here.
-    int dimensionSize =
-        mergerParameters.getDimColCount() + mergerParameters.getComplexDimColCount();
-    int measureSize = mergerParameters.getMeasureColCount();
-    for (; dimCount < dimensionSize; dimCount++) {
-      byte[] col = (byte[]) row[dimCount];
-      rowData.putShort((short)col.length);
-      size += 2;
-      rowData.put(col);
-      size += col.length;
-    }
-    Arrays.fill(nullSetWords, 0);
-    int nullSetSize = nullSetWords.length * 8;
-    int nullLoc = size;
-    size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      Object value = row[mesCount + dimensionSize];
-      if (null != value) {
-        switch (type[mesCount]) {
-          case SHORT:
-            rowData.putShort(size, (Short) value);
-            size += 2;
-            break;
-          case INT:
-            rowData.putInt(size, (Integer) value);
-            size += 4;
-            break;
-          case LONG:
-            rowData.putLong(size, (Long) value);
-            size += 8;
-            break;
-          case DOUBLE:
-            rowData.putDouble(size, (Double) value);
-            size += 8;
-            break;
-          case DECIMAL:
-            byte[] bigDecimalInBytes = (byte[]) value;
-            rowData.putShort(size, (short)bigDecimalInBytes.length);
-            size += 2;
-            for (int i = 0; i < bigDecimalInBytes.length; i++) {
-              rowData.put(size++, bigDecimalInBytes[i]);
-            }
-            break;
-        }
-        UnsafeCarbonRowPage.set(nullSetWords, mesCount);
-      } else {
-        UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
-      }
-    }
-    for (int i = 0; i < nullSetWords.length; i++) {
-      rowData.putLong(nullLoc, nullSetWords[i]);
-      nullLoc += 8;
-    }
-    byte[] rowBytes = new byte[size];
-    rowData.position(0);
-    rowData.get(rowBytes);
-    stream.write(rowBytes);
-    rowData.clear();
-  }
-
-  private void finish() throws CarbonSortKeyAndGroupByException {
-    if (recordHolderHeap != null) {
-      int size = recordHolderHeap.size();
-      for (int i = 0; i < size; i++) {
-        recordHolderHeap.poll().close();
-      }
-    }
-    try {
-      CarbonUtil.deleteFiles(intermediateFiles);
-      rowData.clear();
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
deleted file mode 100644
index 49791e8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It does mergesort intermediate files to big file.
- */
-public class UnsafeIntermediateMerger {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeIntermediateMerger.class.getName());
-
-  /**
-   * executorService
-   */
-  private ExecutorService executorService;
-  /**
-   * rowPages
-   */
-  private List<UnsafeCarbonRowPage> rowPages;
-
-  private List<UnsafeInMemoryIntermediateDataMerger> mergedPages;
-
-  private SortParameters parameters;
-
-  private final Object lockObject = new Object();
-
-  private boolean offHeap;
-
-  private List<File> procFiles;
-
-  public UnsafeIntermediateMerger(SortParameters parameters) {
-    this.parameters = parameters;
-    // processed file list
-    this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    this.mergedPages = new ArrayList<>();
-    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
-    this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
-            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
-    this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-  }
-
-  public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
-    // add sort temp filename to and arrayList. When the list size reaches 20 then
-    // intermediate merging of sort temp files will be triggered
-    synchronized (lockObject) {
-      rowPages.add(rowPage);
-    }
-  }
-
-  public void addFileToMerge(File sortTempFile) {
-    // add sort temp filename to and arrayList. When the list size reaches 20 then
-    // intermediate merging of sort temp files will be triggered
-    synchronized (lockObject) {
-      procFiles.add(sortTempFile);
-    }
-  }
-
-  public void startFileMergingIfPossible() {
-    File[] fileList;
-    if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      synchronized (lockObject) {
-        fileList = procFiles.toArray(new File[procFiles.size()]);
-        this.procFiles = new ArrayList<File>();
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
-      }
-      startIntermediateMerging(fileList);
-    }
-  }
-
-  /**
-   * Below method will be used to start the intermediate file merging
-   *
-   * @param intermediateFiles
-   */
-  private void startIntermediateMerging(File[] intermediateFiles) {
-    //pick a temp location randomly
-    String[] tempFileLocations = parameters.getTempFileLocation();
-    String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
-
-    File file = new File(
-        targetLocation + File.separator + parameters.getTableName() + System
-            .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
-    UnsafeIntermediateFileMerger merger =
-        new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.execute(merger);
-  }
-
-  public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
-    UnsafeCarbonRowPage[] localRowPages;
-    if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
-      int totalRows = 0;
-      synchronized (lockObject) {
-        totalRows = getTotalNumberOfRows(rowPages);
-        if (totalRows <= 0) {
-          return;
-        }
-        localRowPages = rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]);
-        this.rowPages = new ArrayList<>();
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Sumitting request for intermediate merging of in-memory pages : "
-            + localRowPages.length);
-      }
-      startIntermediateMerging(localRowPages, totalRows);
-    }
-  }
-
-  /**
-   * Below method will be used to start the intermediate file merging
-   *
-   * @param rowPages
-   */
-  private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows)
-      throws CarbonSortKeyAndGroupByException {
-    UnsafeInMemoryIntermediateDataMerger merger =
-        new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows);
-    mergedPages.add(merger);
-    executorService.execute(merger);
-  }
-
-  private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) {
-    int totalSize = 0;
-    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
-      totalSize += unsafeCarbonRowPage.getBuffer().getActualSize();
-    }
-    return totalSize;
-  }
-
-  public void finish() throws CarbonSortKeyAndGroupByException {
-    try {
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
-    }
-  }
-
-  public void close() {
-    if (executorService.isShutdown()) {
-      executorService.shutdownNow();
-    }
-    rowPages.clear();
-    rowPages = null;
-  }
-
-  public List<UnsafeCarbonRowPage> getRowPages() {
-    return rowPages;
-  }
-
-  public List<UnsafeInMemoryIntermediateDataMerger> getMergedPages() {
-    return mergedPages;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
deleted file mode 100644
index e3bbdcb..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.newflow.sort.SortStepRowUtil;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeFinalMergePageHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeSingleThreadFinalSortFilesMerger.class.getName());
-
-  /**
-   * fileCounter
-   */
-  private int fileCounter;
-
-  /**
-   * recordHolderHeap
-   */
-  private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
-
-  private SortParameters parameters;
-
-  /**
-   * tempFileLocation
-   */
-  private String[] tempFileLocation;
-
-  private String tableName;
-
-  private boolean isStopProcess;
-
-  public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
-      String[] tempFileLocation) {
-    this.parameters = parameters;
-    this.tempFileLocation = tempFileLocation;
-    this.tableName = parameters.getTableName();
-  }
-
-  /**
-   * This method will be used to merger the merged files
-   *
-   */
-  public void startFinalMerge(UnsafeCarbonRowPage[] rowPages,
-      List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
-    startSorting(rowPages, merges);
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   */
-  private void startSorting(UnsafeCarbonRowPage[] rowPages,
-      List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException {
-    try {
-      List<File> filesToMergeSort = getFilesToMergeSort();
-      this.fileCounter = rowPages.length + filesToMergeSort.size() + merges.size();
-      if (fileCounter == 0) {
-        LOGGER.info("No files to merge sort");
-        return;
-      }
-      LOGGER.info("Number of row pages: " + this.fileCounter);
-
-      // create record holder heap
-      createRecordHolderQueue();
-
-      // iterate over file list and create chunk holder and add to heap
-      LOGGER.info("Started adding first record from each page");
-      for (final UnsafeCarbonRowPage rowPage : rowPages) {
-
-        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
-            parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                .getMeasureColCount(), parameters.getNumberOfSortColumns());
-
-        // initialize
-        sortTempFileChunkHolder.readRow();
-
-        recordHolderHeapLocal.add(sortTempFileChunkHolder);
-      }
-
-      for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
-
-        SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
-                parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                    .getMeasureColCount());
-
-        // initialize
-        sortTempFileChunkHolder.readRow();
-
-        recordHolderHeapLocal.add(sortTempFileChunkHolder);
-      }
-
-      for (final File file : filesToMergeSort) {
-
-        SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeSortTempFileChunkHolder(file, parameters);
-
-        // initialize
-        sortTempFileChunkHolder.readRow();
-
-        recordHolderHeapLocal.add(sortTempFileChunkHolder);
-      }
-
-      LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new CarbonDataWriterException(e.getMessage());
-    }
-  }
-
-  private List<File> getFilesToMergeSort() {
-    FileFilter fileFilter = new FileFilter() {
-      public boolean accept(File pathname) {
-        return pathname.getName().startsWith(tableName);
-      }
-    };
-
-    // get all the merged files
-    List<File> files = new ArrayList<File>(tempFileLocation.length);
-    for (String tempLoc : tempFileLocation)
-    {
-      File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0)
-      {
-        files.addAll(Arrays.asList(subFiles));
-      }
-    }
-
-    return files;
-  }
-
-  /**
-   * This method will be used to create the heap which will be used to hold
-   * the chunk of data
-   */
-  private void createRecordHolderQueue() {
-    // creating record holder heap
-    this.recordHolderHeapLocal = new PriorityQueue<SortTempChunkHolder>(fileCounter);
-  }
-
-  /**
-   * This method will be used to get the sorted row
-   *
-   * @return sorted row
-   */
-  public Object[] next() {
-    return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
-  }
-
-  /**
-   * This method will be used to get the sorted record from file
-   *
-   * @return sorted record sorted record
-   */
-  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
-    Object[] row = null;
-
-    // poll the top object from heap
-    // heap maintains binary tree which is based on heap condition that will
-    // be based on comparator we are passing the heap
-    // when will call poll it will always delete root of the tree and then
-    // it does trickel down operation complexity is log(n)
-    SortTempChunkHolder poll = this.recordHolderHeapLocal.poll();
-
-    // get the row from chunk
-    row = poll.getRow();
-
-    // check if there no entry present
-    if (!poll.hasNext()) {
-      // if chunk is empty then close the stream
-      poll.close();
-
-      // change the file counter
-      --this.fileCounter;
-
-      // reaturn row
-      return row;
-    }
-
-    // read new row
-    try {
-      poll.readRow();
-    } catch (Exception e) {
-      throw new CarbonDataWriterException(e.getMessage(), e);
-    }
-
-    // add to heap
-    this.recordHolderHeapLocal.add(poll);
-
-    // return row
-    return row;
-  }
-
-  /**
-   * This method will be used to check whether any more element is present or
-   * not
-   *
-   * @return more element is present
-   */
-  public boolean hasNext() {
-    return this.fileCounter > 0;
-  }
-
-  public void clear() {
-    if (null != recordHolderHeapLocal) {
-      for (SortTempChunkHolder pageHolder : recordHolderHeapLocal) {
-        pageHolder.close();
-      }
-      recordHolderHeapLocal = null;
-    }
-  }
-
-  public boolean isStopProcess() {
-    return isStopProcess;
-  }
-
-  public void setStopProcess(boolean stopProcess) {
-    isStopProcess = stopProcess;
-  }
-}