You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:17 UTC

[30/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
deleted file mode 100644
index 56a32a3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.concurrent.ExecutorService;
-
-public class ThreadStatusObserver {
-
-  /**
-   * lock object
-   */
-  private Object lock = new Object();
-
-  private ExecutorService executorService;
-
-  private Throwable throwable;
-
-  public ThreadStatusObserver(ExecutorService executorService) {
-    this.executorService = executorService;
-  }
-
-  public void notifyFailed(Throwable throwable) {
-    // Only the first failing thread should call for shutting down the executor service and
-    // should assign the throwable object else the actual cause for failure can be overridden as
-    // all the running threads will throw interrupted exception on calling shutdownNow and
-    // will override the throwable object
-    if (null == this.throwable) {
-      synchronized (lock) {
-        if (null == this.throwable) {
-          executorService.shutdownNow();
-          this.throwable = throwable;
-        }
-      }
-    }
-  }
-
-  public Throwable getThrowable() {
-    return throwable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
deleted file mode 100644
index 056c96b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * It sorts data in batches and send to the next step.
- */
-public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private ExecutorService executorService;
-
-  private AtomicLong rowCounter;
-
-  public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
-    this.rowCounter = rowCounter;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    this.executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-    final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
-        this.threadStatusObserver);
-
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(
-            new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
-                this.threadStatusObserver));
-      }
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    // Creates the iterator to read from merge sorter.
-    Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
-
-      @Override public boolean hasNext() {
-        return sortBatchHolder.hasNext();
-      }
-
-      @Override public CarbonSortBatch next() {
-        return new CarbonSortBatch(sortBatchHolder.next());
-      }
-    };
-    return new Iterator[] { batchIterator };
-  }
-
-  @Override public void close() {
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      LOGGER.error(e);
-    }
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private SortBatchHolder sortDataRows;
-
-    private Object[][] buffer;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
-        int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.buffer = new Object[batchSize][];
-      this.rowCounter = rowCounter;
-      this.threadStatusObserver = threadStatusObserver;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              buffer[i++] = row.getData();
-            }
-          }
-          if (i > 0) {
-            synchronized (sortDataRows) {
-              sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
-              rowCounter.getAndAdd(i);
-              if (!sortDataRows.getSortDataRow().canAdd()) {
-                sortDataRows.finish(false);
-                sortDataRows.createSortDataRows();
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      } finally {
-        sortDataRows.finishThread();
-      }
-    }
-
-  }
-
-  private static class SortBatchHolder
-      extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {
-
-    private SortParameters sortParameters;
-
-    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
-    private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
-
-    private UnsafeSortDataRows sortDataRow;
-
-    private final BlockingQueue<UnsafeSingleThreadFinalSortFilesMerger> mergerQueue;
-
-    private AtomicInteger iteratorCount;
-
-    private int batchCount;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    private final Object lock = new Object();
-
-    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
-        ThreadStatusObserver threadStatusObserver) {
-      this.sortParameters = sortParameters.getCopy();
-      this.iteratorCount = new AtomicInteger(numberOfThreads);
-      this.mergerQueue = new LinkedBlockingQueue<>(1);
-      this.threadStatusObserver = threadStatusObserver;
-      createSortDataRows();
-    }
-
-    private void createSortDataRows() {
-      int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-      setTempLocation(sortParameters);
-      this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
-          sortParameters.getTempFileLocation());
-      unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
-      sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
-          inMemoryChunkSizeInMB);
-
-      try {
-        sortDataRow.initialize();
-      } catch (MemoryException e) {
-        throw new CarbonDataLoadingException(e);
-      }
-      batchCount++;
-    }
-
-    private void setTempLocation(SortParameters parameters) {
-      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-          .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
-            parameters.getSegmentId(), false, false);
-      String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
-          File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-      parameters.setTempFileLocation(tempDirs);
-    }
-
-    @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
-      try {
-        UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
-            mergerQueue.take();
-        if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
-          throw new RuntimeException(threadStatusObserver.getThrowable());
-        }
-        return unsafeSingleThreadFinalSortFilesMerger;
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public UnsafeSortDataRows getSortDataRow() {
-      return sortDataRow;
-    }
-
-    public void finish(boolean isFinalAttempt) {
-      try {
-        // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
-        // then set stop process to true in the finalmerger instance
-        if (mergerQueue.isEmpty() && threadStatusObserver != null
-            && threadStatusObserver.getThrowable() != null && threadStatusObserver
-            .getThrowable() instanceof CarbonDataLoadingException) {
-          finalMerger.setStopProcess(true);
-          if (isFinalAttempt) {
-            iteratorCount.decrementAndGet();
-          }
-          mergerQueue.put(finalMerger);
-          return;
-        }
-        processRowToNextStep(sortDataRow, sortParameters);
-        unsafeIntermediateFileMerger.finish();
-        List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
-        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
-            unsafeIntermediateFileMerger.getMergedPages());
-        unsafeIntermediateFileMerger.close();
-        if (isFinalAttempt) {
-          iteratorCount.decrementAndGet();
-        }
-        mergerQueue.put(finalMerger);
-        sortDataRow = null;
-        unsafeIntermediateFileMerger = null;
-        finalMerger = null;
-      } catch (CarbonDataWriterException e) {
-        throw new CarbonDataLoadingException(e);
-      } catch (CarbonSortKeyAndGroupByException e) {
-        throw new CarbonDataLoadingException(e);
-      } catch (InterruptedException e) {
-        // if fails to put in queue because of interrupted exception, we can offer to free the main
-        // thread from waiting.
-        if (finalMerger != null) {
-          finalMerger.setStopProcess(true);
-          boolean offered = mergerQueue.offer(finalMerger);
-          if (!offered) {
-            throw new CarbonDataLoadingException(e);
-          }
-        }
-        throw new CarbonDataLoadingException(e);
-      }
-    }
-
-    public void finishThread() {
-      synchronized (lock) {
-        if (iteratorCount.get() <= 1) {
-          finish(true);
-        } else {
-          iteratorCount.decrementAndGet();
-        }
-      }
-    }
-
-    public boolean hasNext() {
-      return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
-    }
-
-    /**
-     * Below method will be used to process data to next step
-     */
-    private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
-        throws CarbonDataLoadingException {
-      if (null == sortDataRows) {
-        LOGGER.info("Record Processed For table: " + parameters.getTableName());
-        LOGGER.info("Number of Records was Zero");
-        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-        LOGGER.info(logMessage);
-        return false;
-      }
-
-      try {
-        // start sorting
-        sortDataRows.startSorting();
-
-        // check any more rows are present
-        LOGGER.info("Record Processed For table: " + parameters.getTableName());
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-            .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-            .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
-                System.currentTimeMillis());
-        return false;
-      } catch (InterruptedException e) {
-        throw new CarbonDataLoadingException(e);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
deleted file mode 100644
index a0d43ba..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- */
-public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private UnsafeIntermediateMerger unsafeIntermediateFileMerger;
-
-  private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
-  private AtomicLong rowCounter;
-
-  public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
-    this.rowCounter = rowCounter;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-    unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
-
-    finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
-        sortParameters.getTempFileLocation());
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    UnsafeSortDataRows sortDataRow =
-        new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      sortDataRow.initialize();
-    } catch (MemoryException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(
-            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
-                this.threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRow, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      unsafeIntermediateFileMerger.finish();
-      List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
-      finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
-          unsafeIntermediateFileMerger.getMergedPages());
-    } catch (CarbonDataWriterException e) {
-      throw new CarbonDataLoadingException(e);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    // Creates the iterator to read from merge sorter.
-    Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
-
-      @Override public boolean hasNext() {
-        return finalMerger.hasNext();
-      }
-
-      @Override public CarbonRowBatch next() {
-        int counter = 0;
-        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-        while (finalMerger.hasNext() && counter < batchSize) {
-          rowBatch.addRow(new CarbonRow(finalMerger.next()));
-          counter++;
-        }
-        return rowBatch;
-      }
-    };
-    return new Iterator[] { batchIterator };
-  }
-
-  @Override public void close() {
-    unsafeIntermediateFileMerger.close();
-    finalMerger.clear();
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      // start sorting
-      sortDataRows.startSorting();
-
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (InterruptedException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private UnsafeSortDataRows sortDataRows;
-
-    private Object[][] buffer;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
-        UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
-        ThreadStatusObserver threadStatusObserver) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.buffer = new Object[batchSize][];
-      this.rowCounter = rowCounter;
-      this.threadStatusObserver = threadStatusObserver;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              buffer[i++] = row.getData();
-            }
-          }
-          if (i > 0) {
-            sortDataRows.addRowBatch(buffer, i);
-            rowCounter.getAndAdd(i);
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index 54e0180..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(
-                UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private BucketingInfo bucketingInfo;
-
-  public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
-      BucketingInfo bucketingInfo) {
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
-    UnsafeIntermediateMerger[] intermediateFileMergers =
-        new UnsafeIntermediateMerger[sortDataRows.length];
-    int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
-    if (inMemoryChunkSizeInMB < 5) {
-      inMemoryChunkSizeInMB = 5;
-    }
-    try {
-      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-        SortParameters parameters = sortParameters.getCopy();
-        parameters.setPartitionID(i + "");
-        setTempLocation(parameters);
-        intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
-        sortDataRows[i] =
-            new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
-        sortDataRows[i].initialize();
-      }
-    } catch (MemoryException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
-            .threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRows, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      for (int i = 0; i < intermediateFileMergers.length; i++) {
-        intermediateFileMergers[i].finish();
-      }
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
-    for (int i = 0; i < sortDataRows.length; i++) {
-      batchIterator[i] =
-          new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
-    }
-
-    return batchIterator;
-  }
-
-  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
-    // Set the data file location
-    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
-        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
-  }
-
-  @Override public void close() {
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      for (int i = 0; i < sortDataRows.length; i++) {
-        // start sorting
-        sortDataRows[i].startSorting();
-      }
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
-            false, false);
-    String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(tmpLoc);
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private UnsafeSortDataRows[] sortDataRows;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
-        UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.threadStatusObserver = threadStatusObserver;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
-              synchronized (sortDataRow) {
-                sortDataRow.addRow(row.getData());
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-
-  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private String partitionId;
-
-    private int batchSize;
-
-    private boolean firstRow;
-
-    private UnsafeIntermediateMerger intermediateMerger;
-
-    public MergedDataIterator(String partitionId, int batchSize,
-        UnsafeIntermediateMerger intermediateMerger) {
-      this.partitionId = partitionId;
-      this.batchSize = batchSize;
-      this.intermediateMerger = intermediateMerger;
-      this.firstRow = true;
-    }
-
-    private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
-    @Override public boolean hasNext() {
-      if (firstRow) {
-        firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
-        List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
-        finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
-            intermediateMerger.getMergedPages());
-      }
-      return finalMerger.hasNext();
-    }
-
-    @Override public CarbonRowBatch next() {
-      int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-      while (finalMerger.hasNext() && counter < batchSize) {
-        rowBatch.addRow(new CarbonRow(finalMerger.next()));
-        counter++;
-      }
-      return rowBatch;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
deleted file mode 100644
index 8b23437..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.IntPointerBuffer;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
- */
-public class UnsafeCarbonRowPage {
-
-  private boolean[] noDictionaryDimensionMapping;
-
-  private boolean[] noDictionarySortColumnMapping;
-
-  private int dimensionSize;
-
-  private int measureSize;
-
-  private DataType[] measureDataType;
-
-  private long[] nullSetWords;
-
-  private IntPointerBuffer buffer;
-
-  private int lastSize;
-
-  private long sizeToBeUsed;
-
-  private MemoryBlock dataBlock;
-
-  private boolean saveToDisk;
-
-  private MemoryManagerType managerType;
-
-  private long taskId;
-
-  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
-      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
-      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
-    this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
-    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
-    this.dimensionSize = dimensionSize;
-    this.measureSize = measureSize;
-    this.measureDataType = type;
-    this.saveToDisk = saveToDisk;
-    this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
-    this.taskId = taskId;
-    buffer = new IntPointerBuffer(this.taskId);
-    this.dataBlock = memoryBlock;
-    // TODO Only using 98% of space for safe side.May be we can have different logic.
-    sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
-    this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
-  }
-
-  public int addRow(Object[] row) {
-    int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
-    buffer.set(lastSize);
-    lastSize = lastSize + size;
-    return size;
-  }
-
-  private int addRow(Object[] row, long address) {
-    if (row == null) {
-      throw new RuntimeException("Row is null ??");
-    }
-    int dimCount = 0;
-    int size = 0;
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        byte[] col = (byte[]) row[dimCount];
-        CarbonUnsafe.getUnsafe()
-            .putShort(baseObject, address + size, (short) col.length);
-        size += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-            address + size, col.length);
-        size += col.length;
-      } else {
-        int value = (int) row[dimCount];
-        CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
-        size += 4;
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      byte[] col = (byte[]) row[dimCount];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-          address + size, col.length);
-      size += col.length;
-    }
-    Arrays.fill(nullSetWords, 0);
-    int nullSetSize = nullSetWords.length * 8;
-    int nullWordLoc = size;
-    size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      Object value = row[mesCount + dimensionSize];
-      if (null != value) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            Short sval = (Short) value;
-            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
-            size += 2;
-            break;
-          case INT:
-            Integer ival = (Integer) value;
-            CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
-            size += 4;
-            break;
-          case LONG:
-            Long val = (Long) value;
-            CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
-            size += 8;
-            break;
-          case DOUBLE:
-            Double doubleVal = (Double) value;
-            CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
-            size += 8;
-            break;
-          case DECIMAL:
-            BigDecimal decimalVal = (BigDecimal) value;
-            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
-            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size,
-                (short) bigDecimalInBytes.length);
-            size += 2;
-            CarbonUnsafe.getUnsafe()
-                .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-                    address + size, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            break;
-          default:
-            throw  new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
-        }
-        set(nullSetWords, mesCount);
-      } else {
-        unset(nullSetWords, mesCount);
-      }
-    }
-    CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
-        address + nullWordLoc, nullSetSize);
-    return size;
-  }
-
-  public Object[] getRow(long address, Object[] rowToFill) {
-    int dimCount = 0;
-    int size = 0;
-
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-        byte[] col = new byte[aShort];
-        size += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                col.length);
-        size += col.length;
-        rowToFill[dimCount] = col;
-      } else {
-        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-        size += 4;
-        rowToFill[dimCount] = anInt;
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      byte[] col = new byte[aShort];
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
-      size += col.length;
-      rowToFill[dimCount] = col;
-    }
-
-    int nullSetSize = nullSetWords.length * 8;
-    Arrays.fill(nullSetWords, 0);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
-            nullSetSize);
-    size += nullSetSize;
-
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      if (isSet(nullSetWords, mesCount)) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            size += 2;
-            rowToFill[dimensionSize + mesCount] = sval;
-            break;
-          case INT:
-            Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-            size += 4;
-            rowToFill[dimensionSize + mesCount] = ival;
-            break;
-          case LONG:
-            Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-            size += 8;
-            rowToFill[dimensionSize + mesCount] = val;
-            break;
-          case DOUBLE:
-            Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-            size += 8;
-            rowToFill[dimensionSize + mesCount] = doubleVal;
-            break;
-          case DECIMAL:
-            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            byte[] bigDecimalInBytes = new byte[aShort];
-            size += 2;
-            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-            break;
-          default:
-            throw new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
-        }
-      } else {
-        rowToFill[dimensionSize + mesCount] = null;
-      }
-    }
-    return rowToFill;
-  }
-
-  public void fillRow(long address, DataOutputStream stream) throws IOException {
-    int dimCount = 0;
-    int size = 0;
-
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-        byte[] col = new byte[aShort];
-        size += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                col.length);
-        size += col.length;
-        stream.writeShort(aShort);
-        stream.write(col);
-      } else {
-        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-        size += 4;
-        stream.writeInt(anInt);
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      byte[] col = new byte[aShort];
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
-      size += col.length;
-      stream.writeShort(aShort);
-      stream.write(col);
-    }
-
-    int nullSetSize = nullSetWords.length * 8;
-    Arrays.fill(nullSetWords, 0);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
-            nullSetSize);
-    size += nullSetSize;
-    for (int i = 0; i < nullSetWords.length; i++) {
-      stream.writeLong(nullSetWords[i]);
-    }
-
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      if (isSet(nullSetWords, mesCount)) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            size += 2;
-            stream.writeShort(sval);
-            break;
-          case INT:
-            int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-            size += 4;
-            stream.writeInt(ival);
-            break;
-          case LONG:
-            long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-            size += 8;
-            stream.writeLong(val);
-            break;
-          case DOUBLE:
-            double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-            size += 8;
-            stream.writeDouble(doubleVal);
-            break;
-          case DECIMAL:
-            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            byte[] bigDecimalInBytes = new byte[aShort];
-            size += 2;
-            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            stream.writeShort(aShort);
-            stream.write(bigDecimalInBytes);
-            break;
-          default:
-            throw new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
-        }
-      }
-    }
-  }
-
-  public void freeMemory() {
-    switch (managerType) {
-      case UNSAFE_MEMORY_MANAGER:
-        UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
-        break;
-      default:
-        UnsafeSortMemoryManager.INSTANCE.freeMemory(taskId, dataBlock);
-        buffer.freeMemory();
-    }
-  }
-
-  public boolean isSaveToDisk() {
-    return saveToDisk;
-  }
-
-  public IntPointerBuffer getBuffer() {
-    return buffer;
-  }
-
-  public int getUsedSize() {
-    return lastSize;
-  }
-
-  public boolean canAdd() {
-    return lastSize < sizeToBeUsed;
-  }
-
-  public MemoryBlock getDataBlock() {
-    return dataBlock;
-  }
-
-  public static void set(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    words[wordOffset] |= (1L << index);
-  }
-
-  public static void unset(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    words[wordOffset] &= ~(1L << index);
-  }
-
-  public static boolean isSet(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    return ((words[wordOffset] & (1L << index)) != 0);
-  }
-
-  public boolean[] getNoDictionaryDimensionMapping() {
-    return noDictionaryDimensionMapping;
-  }
-
-  public boolean[] getNoDictionarySortColumnMapping() {
-    return noDictionarySortColumnMapping;
-  }
-
-  public void setNewDataBlock(MemoryBlock newMemoryBlock) {
-    this.dataBlock = newMemoryBlock;
-    this.managerType = MemoryManagerType.UNSAFE_SORT_MEMORY_MANAGER;
-  }
-
-  public enum MemoryManagerType {
-    UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
deleted file mode 100644
index dda0d89..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.IntPointerBuffer;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.newflow.sort.unsafe.sort.TimSort;
-import org.apache.carbondata.processing.newflow.sort.unsafe.sort.UnsafeIntSortDataFormat;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class UnsafeSortDataRows {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName());
-  /**
-   * threadStatusObserver
-   */
-  private ThreadStatusObserver threadStatusObserver;
-  /**
-   * executor service for data sort holder
-   */
-  private ExecutorService dataSorterAndWriterExecutorService;
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-
-  private SortParameters parameters;
-
-  private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
-
-  private UnsafeCarbonRowPage rowPage;
-
-  private final Object addRowsLock = new Object();
-
-  private long inMemoryChunkSize;
-
-  private boolean enableInMemoryIntermediateMerge;
-
-  private int bytesAdded;
-
-  private long maxSizeAllowed;
-
-  /**
-   * semaphore which will used for managing sorted data object arrays
-   */
-  private Semaphore semaphore;
-
-  private final long taskId;
-
-  public UnsafeSortDataRows(SortParameters parameters,
-      UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
-    this.parameters = parameters;
-
-    this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
-
-    // observer of writing file in thread
-    this.threadStatusObserver = new ThreadStatusObserver();
-    this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-    this.inMemoryChunkSize = inMemoryChunkSize;
-    this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
-    enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
-            CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
-
-    this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
-    if (maxSizeAllowed <= 0) {
-      // If user does not input any memory size, then take half the size of usable memory configured
-      // in sort memory size.
-      this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
-    } else {
-      this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
-    }
-  }
-
-  /**
-   * This method will be used to initialize
-   */
-  public void initialize() throws MemoryException {
-    MemoryBlock baseBlock =
-        UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-    boolean isMemoryAvailable =
-        UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size());
-    if (isMemoryAvailable) {
-      UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
-    }
-    this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
-        parameters.getNoDictionarySortColumn(),
-        parameters.getDimColCount() + parameters.getComplexDimColCount(),
-        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
-        !isMemoryAvailable, taskId);
-    // Delete if any older file exists in sort temp folder
-    deleteSortLocationIfExists();
-
-    // create new sort temp directory
-    CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService =
-        Executors.newFixedThreadPool(parameters.getNumberOfCores());
-    semaphore = new Semaphore(parameters.getNumberOfCores());
-  }
-
-  public boolean canAdd() {
-    return bytesAdded < maxSizeAllowed;
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    synchronized (addRowsLock) {
-      addBatch(rowBatch, size);
-    }
-  }
-
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @param size
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
-  public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
-      throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    addBatch(rowBatch, size);
-  }
-
-  private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
-    for (int i = 0; i < size; i++) {
-      if (rowPage.canAdd()) {
-        bytesAdded += rowPage.addRow(rowBatch[i]);
-      } else {
-        try {
-          if (enableInMemoryIntermediateMerge) {
-            unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-          }
-          unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-          semaphore.acquire();
-          dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
-          MemoryBlock memoryBlock =
-              UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-          boolean saveToDisk =
-              UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-          if (!saveToDisk) {
-            UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-          }
-          rowPage = new UnsafeCarbonRowPage(
-                  parameters.getNoDictionaryDimnesionColumn(),
-                  parameters.getNoDictionarySortColumn(),
-                  parameters.getDimColCount() + parameters.getComplexDimColCount(),
-                  parameters.getMeasureColCount(),
-                  parameters.getMeasureDataType(),
-                  memoryBlock,
-                  saveToDisk, taskId);
-          bytesAdded += rowPage.addRow(rowBatch[i]);
-        } catch (Exception e) {
-          LOGGER.error(
-                  "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-          throw new CarbonSortKeyAndGroupByException(e);
-        }
-
-      }
-    }
-  }
-
-  /**
-   * This method will be used to add new row
-   */
-  public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    if (rowPage.canAdd()) {
-      rowPage.addRow(row);
-    } else {
-      try {
-        if (enableInMemoryIntermediateMerge) {
-          unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-        }
-        unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-        semaphore.acquire();
-        dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-        MemoryBlock memoryBlock =
-            UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
-        boolean saveToDisk = UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(memoryBlock.size());
-        if (!saveToDisk) {
-          UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
-        }
-        rowPage = new UnsafeCarbonRowPage(
-            parameters.getNoDictionaryDimnesionColumn(),
-            parameters.getNoDictionarySortColumn(),
-            parameters.getDimColCount(), parameters.getMeasureColCount(),
-            parameters.getMeasureDataType(), memoryBlock,
-            saveToDisk, taskId);
-        rowPage.addRow(row);
-      } catch (Exception e) {
-        LOGGER.error(
-            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-        throw new CarbonSortKeyAndGroupByException(e);
-      }
-
-    }
-  }
-
-  /**
-   * Below method will be used to start storing process This method will get
-   * all the temp files present in sort temp folder then it will create the
-   * record holder heap and then it will read first record from each file and
-   * initialize the heap
-   *
-   * @throws InterruptedException
-   */
-  public void startSorting() throws InterruptedException {
-    LOGGER.info("Unsafe based sorting will be used");
-    if (this.rowPage.getUsedSize() > 0) {
-      TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
-          new UnsafeIntSortDataFormat(rowPage));
-      if (parameters.getNumberOfNoDictSortColumns() > 0) {
-        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparator(rowPage));
-      } else {
-        timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDIms(rowPage));
-      }
-      unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
-    } else {
-      rowPage.freeMemory();
-    }
-    startFileBasedMerge();
-  }
-
-  private void writeData(UnsafeCarbonRowPage rowPage, File file)
-      throws CarbonSortKeyAndGroupByException {
-    DataOutputStream stream = null;
-    try {
-      // open stream
-      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
-          parameters.getFileWriteBufferSize()));
-      int actualSize = rowPage.getBuffer().getActualSize();
-      // write number of entries to the file
-      stream.writeInt(actualSize);
-      for (int i = 0; i < actualSize; i++) {
-        rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
-            stream);
-      }
-
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
-    } finally {
-      // close streams
-      CarbonUtil.closeStreams(stream);
-    }
-  }
-
-  /**
-   * This method will be used to delete sort temp location is it is exites
-   */
-  public void deleteSortLocationIfExists() {
-    CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
-  }
-
-  /**
-   * Below method will be used to start file based merge
-   *
-   * @throws InterruptedException
-   */
-  private void startFileBasedMerge() throws InterruptedException {
-    dataSorterAndWriterExecutorService.shutdown();
-    dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS);
-  }
-
-  /**
-   * Observer class for thread execution
-   * In case of any failure we need stop all the running thread
-   */
-  private class ThreadStatusObserver {
-    /**
-     * Below method will be called if any thread fails during execution
-     *
-     * @param exception
-     * @throws CarbonSortKeyAndGroupByException
-     */
-    public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
-      dataSorterAndWriterExecutorService.shutdownNow();
-      unsafeInMemoryIntermediateFileMerger.close();
-      parameters.getObserver().setFailed(true);
-      LOGGER.error(exception);
-      throw new CarbonSortKeyAndGroupByException(exception);
-    }
-  }
-
-  /**
-   * This class is responsible for sorting and writing the object
-   * array which holds the records equal to given array size
-   */
-  private class DataSorterAndWriter implements Runnable {
-    private UnsafeCarbonRowPage page;
-
-    public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
-      this.page = rowPage;
-    }
-
-    @Override
-    public void run() {
-      try {
-        long startTime = System.currentTimeMillis();
-        TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
-            new UnsafeIntSortDataFormat(page));
-        // if sort_columns is not none, sort by sort_columns
-        if (parameters.getNumberOfNoDictSortColumns() > 0) {
-          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparator(page));
-        } else {
-          timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDIms(page));
-        }
-        if (page.isSaveToDisk()) {
-          // create a new file every time
-          // create a new file and pick a temp directory randomly every time
-          String tmpDir = parameters.getTempFileLocation()[
-              new Random().nextInt(parameters.getTempFileLocation().length)];
-          File sortTempFile = new File(
-              tmpDir + File.separator + parameters.getTableName()
-                  + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
-          writeData(page, sortTempFile);
-          LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
-              + " and write is: " + (System.currentTimeMillis() - startTime));
-          page.freeMemory();
-          // add sort temp filename to and arrayList. When the list size reaches 20 then
-          // intermediate merging of sort temp files will be triggered
-          unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile);
-        } else {
-          // creating a new memory block as size is already allocated
-          // so calling lazy memory allocator
-          MemoryBlock newMemoryBlock = UnsafeSortMemoryManager.INSTANCE
-              .allocateMemoryLazy(taskId, page.getDataBlock().size());
-          // copying data from working memory manager to sortmemory manager
-          CarbonUnsafe.getUnsafe()
-              .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(),
-                  newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(),
-                  page.getDataBlock().size());
-          // free unsafememory manager
-          page.freeMemory();
-          page.setNewDataBlock(newMemoryBlock);
-          // add sort temp filename to and arrayList. When the list size reaches 20 then
-          // intermediate merging of sort temp files will be triggered
-          page.getBuffer().loadToUnsafe();
-          unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
-          LOGGER.info(
-              "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
-                  + (System.currentTimeMillis() - startTime));
-        }
-      } catch (Throwable e) {
-        try {
-          threadStatusObserver.notifyFailed(e);
-        } catch (CarbonSortKeyAndGroupByException ex) {
-          LOGGER.error(e);
-        }
-      } finally {
-        semaphore.release();
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
deleted file mode 100644
index c54dcd6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
-
-  /**
-   * mapping of dictionary and no dictionary of sort_columns.
-   */
-  private boolean[] noDictionarySortColumnMaping;
-
-  private Object baseObject;
-
-  public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
-    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-      if (isNoDictionary) {
-        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
-        sizeA += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
-        sizeA += aShort1;
-
-        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
-        sizeB += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
-        sizeB += aShort2;
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
-        }
-      } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
-        sizeA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
-        sizeB += 4;
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-      }
-    }
-
-    return diff;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, Object baseObjectL, UnsafeCarbonRow rowR,
-      Object baseObjectR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-      if (isNoDictionary) {
-        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
-        sizeA += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort1);
-        sizeA += aShort1;
-
-        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
-        sizeB += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort2);
-        sizeB += aShort2;
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
-        }
-      } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
-        sizeA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
-        sizeB += 4;
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-      }
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
deleted file mode 100644
index 53f976f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
-
-  private Object baseObject;
-
-  private int numberOfSortColumns;
-
-  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (int i = 0; i < numberOfSortColumns; i++) {
-      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
-      sizeA += 4;
-      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
-      sizeB += 4;
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
deleted file mode 100644
index 9eab940..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/SortTempChunkHolder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * Interface for merging temporary sort files/ inmemory data
- */
-public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
-
-  boolean hasNext();
-
-  void readRow()  throws CarbonSortKeyAndGroupByException;
-
-  Object[] getRow();
-
-  int numberOfRows();
-
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
deleted file mode 100644
index aff60f6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRow.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-public class UnsafeCarbonRow {
-
-  public long address;
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
deleted file mode 100644
index 0ec4553..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeCarbonRowForMerge.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-public class UnsafeCarbonRowForMerge extends UnsafeCarbonRow {
-
-  public byte index;
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
deleted file mode 100644
index f00dd45..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe.holder;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator;
-
-public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeFinalMergePageHolder.class.getName());
-
-  private int counter;
-
-  private int actualSize;
-
-  private long[] mergedAddresses;
-
-  private byte[] rowPageIndexes;
-
-  private UnsafeCarbonRowPage[] rowPages;
-
-  private NewRowComparator comparator;
-
-  private Object[] currentRow;
-
-  private int columnSize;
-
-  public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
-      boolean[] noDictSortColumnMapping, int columnSize) {
-    this.actualSize = merger.getEntryCount();
-    this.mergedAddresses = merger.getMergedAddresses();
-    this.rowPageIndexes = merger.getRowPageIndexes();
-    this.rowPages = merger.getUnsafeCarbonRowPages();
-    LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(noDictSortColumnMapping);
-    this.columnSize = columnSize;
-  }
-
-  public boolean hasNext() {
-    if (counter < actualSize) {
-      return true;
-    }
-    return false;
-  }
-
-  public void readRow() {
-    currentRow = new Object[columnSize];
-    rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
-    counter++;
-  }
-
-  public Object[] getRow() {
-    return currentRow;
-  }
-
-  @Override public int compareTo(SortTempChunkHolder o) {
-    return comparator.compare(currentRow, o.getRow());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (!(obj instanceof UnsafeFinalMergePageHolder)) {
-      return false;
-    }
-
-    UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj;
-    return this == o;
-  }
-
-  @Override public int hashCode() {
-    return super.hashCode();
-  }
-
-  public int numberOfRows() {
-    return actualSize;
-  }
-
-  public void close() {
-    for (int i = 0; i < rowPages.length; i++) {
-      rowPages[i].freeMemory();
-    }
-  }
-}