You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/05 11:09:30 UTC

carbondata git commit: [CARBONDATA-1410] Fixed thread leak issue in case of data loading

Repository: carbondata
Updated Branches:
  refs/heads/master 7d3bf0503 -> a734add5a


[CARBONDATA-1410] Fixed thread leak issue in case of data loading

Problem: In case of data loading failure threads are not getting closed and its causing thread leak in long run, because of this OOM is coming
Solution: Close all the thread in case of failure , Success and killing

This closes #1401


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a734add5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a734add5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a734add5

Branch: refs/heads/master
Commit: a734add5a95790f207d21a2e0dcc4e1480d51932
Parents: 7d3bf05
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Sep 25 18:24:03 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Oct 5 16:38:48 2017 +0530

----------------------------------------------------------------------
 .../exception/CarbonDataWriterException.java    |  9 +++
 .../core/util/CarbonThreadFactory.java          | 47 ++++++++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  9 ++-
 .../converter/impl/RowConverterImpl.java        |  5 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  9 ++-
 .../impl/UnsafeParallelReadMergeSorterImpl.java |  9 ++-
 .../loading/sort/unsafe/UnsafeSortDataRows.java | 11 ++-
 .../holder/UnsafeSortTempFileChunkHolder.java   |  4 +-
 .../merger/UnsafeIntermediateFileMerger.java    | 40 ++++++----
 .../unsafe/merger/UnsafeIntermediateMerger.java | 31 +++++---
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  3 +-
 .../loading/steps/InputProcessorStepImpl.java   |  5 +-
 .../sort/sortdata/IntermediateFileMerger.java   | 43 +++++++----
 .../SingleThreadFinalSortFilesMerger.java       | 81 ++++++++++++++------
 .../processing/sort/sortdata/SortDataRows.java  | 23 ++++--
 .../sortdata/SortIntermediateFileMerger.java    | 25 +++++-
 .../sort/sortdata/SortTempFileChunkHolder.java  | 11 ++-
 .../store/CarbonFactDataHandlerColumnar.java    | 19 ++++-
 .../store/writer/AbstractFactDataWriter.java    |  4 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  6 +-
 .../processing/util/CarbonLoaderUtil.java       |  4 +-
 21 files changed, 308 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
index 8f75ee8..f9f44a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
@@ -44,6 +44,15 @@ public class CarbonDataWriterException extends RuntimeException {
   /**
    * Constructor
    *
+   * @param t exception.
+   */
+  public CarbonDataWriterException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * Constructor
+   *
    * @param msg The error message for this exception.
    */
   public CarbonDataWriterException(String msg, Throwable t) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
new file mode 100644
index 0000000..689365d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Carbon thread factory class
+ */
+public class CarbonThreadFactory implements ThreadFactory {
+
+  /**
+   * default thread factory
+   */
+  private ThreadFactory defaultFactory;
+
+  /**
+   * pool name
+   */
+  private String name;
+
+  public CarbonThreadFactory(String name) {
+    this.defaultFactory = Executors.defaultThreadFactory();
+    this.name = name;
+  }
+
+  @Override public Thread newThread(Runnable r) {
+    final Thread thread = defaultFactory.newThread(r);
+    thread.setName(name);
+    return thread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1d1b47a..49b708c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -299,8 +299,9 @@ class NewCarbonDataLoadRDD[K, V](
                 split.serializableHadoopSplit.value.getPartition.getUniqueID)
           }
           partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
-          StandardLogService.setThreadName(partitionID, null)
+          StandardLogService.setThreadName(StandardLogService
+            .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+            , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
               partitionID, split.partitionBlocksDetail.length)
           val readers =
@@ -328,7 +329,9 @@ class NewCarbonDataLoadRDD[K, V](
           } else {
             model = carbonLoadModel.getCopyWithPartition(partitionID)
           }
-          StandardLogService.setThreadName(blocksID, null)
+          StandardLogService.setThreadName(StandardLogService
+            .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+            , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
           val readers =
             split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
           readers.zipWithIndex.map { case (reader, index) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index a4351ae..79c6d61 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
@@ -117,7 +118,9 @@ public class RowConverterImpl implements RowConverter {
     // for one pass load, start the dictionary client
     if (configuration.getUseOnePass()) {
       if (executorService == null) {
-        executorService = Executors.newCachedThreadPool();
+        executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
+            "DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
+                .getTableName()));
       }
       Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
         @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 6e43fcb..cefc97d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -59,6 +60,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
   private AtomicLong rowCounter;
 
+  private ExecutorService executorService;
+
   public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
     this.rowCounter = rowCounter;
   }
@@ -94,7 +97,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     } catch (CarbonSortKeyAndGroupByException e) {
       throw new CarbonDataLoadingException(e);
     }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.executorService = Executors.newFixedThreadPool(iterators.length,
+        new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName()));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
@@ -147,6 +151,9 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     if (intermediateFileMerger != null) {
       intermediateFileMerger.close();
     }
+    if (null != executorService && !executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 1a2f704..c05c027 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -59,6 +60,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
   private AtomicLong rowCounter;
 
+  private ExecutorService executorService;
+
   public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
     this.rowCounter = rowCounter;
   }
@@ -82,7 +85,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
     } catch (MemoryException e) {
       throw new CarbonDataLoadingException(e);
     }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.executorService = Executors.newFixedThreadPool(iterators.length,
+        new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName()));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
@@ -131,6 +135,9 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
   }
 
   @Override public void close() {
+    if (null != executorService && !executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
     unsafeIntermediateFileMerger.close();
     finalMerger.clear();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 88b72aa..0210464 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
@@ -137,8 +138,9 @@ public class UnsafeSortDataRows {
 
     // create new sort temp directory
     CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService =
-        Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    this.dataSorterAndWriterExecutorService = Executors
+        .newFixedThreadPool(parameters.getNumberOfCores(),
+            new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName()));
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
@@ -372,7 +374,8 @@ public class UnsafeSortDataRows {
                   + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
           writeData(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
-              + " and write is: " + (System.currentTimeMillis() - startTime));
+              + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
+              + sortTempFile);
           page.freeMemory();
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered
@@ -395,7 +398,7 @@ public class UnsafeSortDataRows {
           page.getBuffer().loadToUnsafe();
           unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
           LOGGER.info(
-              "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
+              "Time taken to sort row page with size: " + page.getBuffer().getActualSize() + "is: "
                   + (System.currentTimeMillis() - startTime));
         }
       } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 331b9db..5fed2ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -383,7 +383,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    */
   public void close() {
     CarbonUtil.closeStreams(stream);
-    executorService.shutdown();
+    if (null != executorService && !executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7f98d72..4303ec8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.AbstractQueue;
 import java.util.Arrays;
 import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriter;
 import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriterFactory;
 
-public class UnsafeIntermediateFileMerger implements Runnable {
+public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
    * LOGGER
    */
@@ -84,6 +85,8 @@ public class UnsafeIntermediateFileMerger implements Runnable {
 
   private ByteBuffer rowData;
 
+  private Throwable throwable;
+
   /**
    * IntermediateFileMerger Constructor
    */
@@ -99,11 +102,9 @@ public class UnsafeIntermediateFileMerger implements Runnable {
     rowData = ByteBuffer.allocate(2 * 1024 * 1024);
   }
 
-  @Override
-  public void run() {
+  @Override public Void call() throws Exception {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int fileConterConst = fileCounter;
-    boolean isFailed = false;
     try {
       startSorting();
       initialize();
@@ -116,24 +117,30 @@ public class UnsafeIntermediateFileMerger implements Runnable {
           + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
     } catch (Exception e) {
       LOGGER.error(e, "Problem while intermediate merging");
-      isFailed = true;
+      clear();
+      throwable = e;
     } finally {
       CarbonUtil.closeStreams(this.stream);
       if (null != writer) {
         writer.finish();
       }
-      if (!isFailed) {
+      if (null == throwable) {
         try {
           finish();
         } catch (CarbonSortKeyAndGroupByException e) {
           LOGGER.error(e, "Problem while deleting the merge file");
+          throwable = e;
         }
       } else {
-        if (outPutFile.delete()) {
+        if (!outPutFile.delete()) {
           LOGGER.error("Problem while deleting the merge file");
         }
       }
     }
+    if (null != throwable) {
+      throw new CarbonSortKeyAndGroupByException(throwable);
+    }
+    return null;
   }
 
   /**
@@ -351,12 +358,7 @@ public class UnsafeIntermediateFileMerger implements Runnable {
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {
-    if (recordHolderHeap != null) {
-      int size = recordHolderHeap.size();
-      for (int i = 0; i < size; i++) {
-        recordHolderHeap.poll().close();
-      }
-    }
+    clear();
     try {
       CarbonUtil.deleteFiles(intermediateFiles);
       rowData.clear();
@@ -364,4 +366,16 @@ public class UnsafeIntermediateFileMerger implements Runnable {
       throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
     }
   }
+
+  private void clear() {
+    if (null != recordHolderHeap) {
+      SortTempChunkHolder sortTempChunkHolder;
+      while (!recordHolderHeap.isEmpty()) {
+        sortTempChunkHolder = recordHolderHeap.poll();
+        if (null != sortTempChunkHolder) {
+          sortTempChunkHolder.close();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index c774d8f..0d24e01 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -20,14 +20,16 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
@@ -55,20 +57,19 @@ public class UnsafeIntermediateMerger {
 
   private final Object lockObject = new Object();
 
-  private boolean offHeap;
-
   private List<File> procFiles;
 
+  private List<Future<Void>> mergerTask;
+
   public UnsafeIntermediateMerger(SortParameters parameters) {
     this.parameters = parameters;
     // processed file list
     this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     this.mergedPages = new ArrayList<>();
-    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
-    this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
-            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
+        new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
     this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    this.mergerTask = new ArrayList<>();
   }
 
   public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
@@ -116,7 +117,7 @@ public class UnsafeIntermediateMerger {
             .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
     UnsafeIntermediateFileMerger merger =
         new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.execute(merger);
+    mergerTask.add(executorService.submit(merger));
   }
 
   public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
@@ -167,16 +168,28 @@ public class UnsafeIntermediateMerger {
     } catch (InterruptedException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
     }
+    checkForFailure();
   }
 
   public void close() {
-    if (executorService.isShutdown()) {
+    if (!executorService.isShutdown()) {
       executorService.shutdownNow();
     }
     rowPages.clear();
     rowPages = null;
   }
 
+  private void checkForFailure() throws CarbonSortKeyAndGroupByException {
+    for (int i = 0; i < mergerTask.size(); i++) {
+      try {
+        mergerTask.get(i).get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonSortKeyAndGroupByException(e.getMessage(), e);
+      }
+    }
+  }
+
   public List<UnsafeCarbonRowPage> getRowPages() {
     return rowPages;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 32b31d7..eb38efe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -97,6 +97,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
         LOGGER.info("No files to merge sort");
         return;
       }
+      LOGGER.info("Starting final merger");
       LOGGER.info("Number of row pages: " + this.fileCounter);
 
       // create record holder heap
@@ -143,7 +144,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
     } catch (Exception e) {
       LOGGER.error(e);
-      throw new CarbonDataWriterException(e.getMessage());
+      throw new CarbonDataWriterException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 70a1254..ae7ece1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -64,7 +65,9 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
   @Override public void initialize() throws IOException {
     super.initialize();
     rowParser = new RowParserImpl(getOutput(), configuration);
-    executorService = Executors.newCachedThreadPool();
+    executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
+        "InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
+            .getTableName()));
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index ffe6fb6..d4a8dd6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -33,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
-public class IntermediateFileMerger implements Runnable {
+public class IntermediateFileMerger implements Callable<Void> {
   /**
    * LOGGER
    */
@@ -88,6 +89,8 @@ public class IntermediateFileMerger implements Runnable {
 
   private boolean[] noDictionarycolumnMapping;
 
+  private Throwable throwable;
+
   /**
    * IntermediateFileMerger Constructor
    */
@@ -100,11 +103,9 @@ public class IntermediateFileMerger implements Runnable {
     noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
   }
 
-  @Override
-  public void run() {
+  @Override public Void call() throws Exception {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int fileConterConst = fileCounter;
-    boolean isFailed = false;
     try {
       startSorting();
       initialize();
@@ -129,25 +130,31 @@ public class IntermediateFileMerger implements Runnable {
           " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
     } catch (Exception e) {
       LOGGER.error(e, "Problem while intermediate merging");
-      isFailed = true;
+      clear();
+      throwable = e;
     } finally {
       records = null;
       CarbonUtil.closeStreams(this.stream);
       if (null != writer) {
         writer.finish();
       }
-      if (!isFailed) {
+      if (null == throwable) {
         try {
           finish();
         } catch (CarbonSortKeyAndGroupByException e) {
           LOGGER.error(e, "Problem while deleting the merge file");
+          throwable = e;
         }
       } else {
-        if (outPutFile.delete()) {
+        if (!outPutFile.delete()) {
           LOGGER.error("Problem while deleting the merge file");
         }
       }
     }
+    if (null != throwable) {
+      throw new CarbonSortKeyAndGroupByException(throwable);
+    }
+    return null;
   }
 
   /**
@@ -251,7 +258,7 @@ public class IntermediateFileMerger implements Runnable {
               mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
               mergerParameters.getMeasureDataType(),
               mergerParameters.getNoDictionaryDimnesionColumn(),
-              mergerParameters.getNoDictionarySortColumn());
+              mergerParameters.getNoDictionarySortColumn(), mergerParameters.getTableName());
 
       // initialize
       sortTempFileChunkHolder.initialize();
@@ -370,16 +377,24 @@ public class IntermediateFileMerger implements Runnable {
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {
-    if (recordHolderHeap != null) {
-      int size = recordHolderHeap.size();
-      for (int i = 0; i < size; i++) {
-        recordHolderHeap.poll().closeStream();
-      }
-    }
+    clear();
     try {
       CarbonUtil.deleteFiles(intermediateFiles);
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
     }
   }
+
+  private void clear() {
+    if (recordHolderHeap != null) {
+      SortTempFileChunkHolder sortTempFileChunkHolder;
+      while (!recordHolderHeap.isEmpty()) {
+        sortTempFileChunkHolder = recordHolderHeap.poll();
+        if (null != sortTempFileChunkHolder) {
+          sortTempFileChunkHolder.closeStream();
+        }
+      }
+    }
+    recordHolderHeap = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 6d6ff94..db4c771 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -105,6 +108,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   private boolean[] isNoDictionarySortColumn;
 
+  private int maxThreadForSorting;
+
+  private ExecutorService executorService;
+
+  private List<Future<Void>> mergerTask;
+
   public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
       DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
@@ -117,6 +126,15 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
     this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+    try {
+      maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
+              CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
+    } catch (NumberFormatException e) {
+      maxThreadForSorting =
+          Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
+    }
+    this.mergerTask = new ArrayList<>();
   }
 
   /**
@@ -174,6 +192,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
         .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
             CarbonCommonConstants.CONSTANT_SIZE_TEN);
 
+    LOGGER.info("Started Final Merge");
+
     LOGGER.info("Number of temp file: " + this.fileCounter);
 
     LOGGER.info("File Buffer Size: " + this.fileBufferSize);
@@ -183,51 +203,51 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
     // iterate over file list and create chunk holder and add to heap
     LOGGER.info("Started adding first record from each file");
-    int maxThreadForSorting = 0;
-    try {
-      maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
-              CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
-    } catch (NumberFormatException e) {
-      maxThreadForSorting =
-          Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
-    }
-    ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
+    this.executorService = Executors.newFixedThreadPool(maxThreadForSorting);
 
     for (final File tempFile : files) {
 
-      Runnable runnable = new Runnable() {
-        @Override public void run() {
-
+      Callable<Void> callable = new Callable<Void>() {
+        @Override public Void call() throws CarbonSortKeyAndGroupByException {
             // create chunk holder
             SortTempFileChunkHolder sortTempFileChunkHolder =
                 new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
                     measureCount, fileBufferSize, noDictionaryCount, measureDataType,
-                    isNoDictionaryColumn, isNoDictionarySortColumn);
+                    isNoDictionaryColumn, isNoDictionarySortColumn, tableName);
           try {
             // initialize
             sortTempFileChunkHolder.initialize();
             sortTempFileChunkHolder.readRow();
           } catch (CarbonSortKeyAndGroupByException ex) {
-            LOGGER.error(ex);
+            sortTempFileChunkHolder.closeStream();
+            notifyFailure(ex);
           }
-
           synchronized (LOCKOBJECT) {
             recordHolderHeapLocal.add(sortTempFileChunkHolder);
           }
+          return null;
         }
       };
-      service.execute(runnable);
+      mergerTask.add(executorService.submit(callable));
     }
-    service.shutdown();
-
+    executorService.shutdown();
     try {
-      service.awaitTermination(2, TimeUnit.HOURS);
+      executorService.awaitTermination(2, TimeUnit.HOURS);
     } catch (Exception e) {
       throw new CarbonDataWriterException(e.getMessage(), e);
     }
+    checkFailure();
+    LOGGER.info("final merger Heap Size" + this.recordHolderHeapLocal.size());
+  }
 
-    LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+  private void checkFailure() {
+    for (int i = 0; i < mergerTask.size(); i++) {
+      try {
+        mergerTask.get(i).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new CarbonDataWriterException(e);
+      }
+    }
   }
 
   /**
@@ -239,6 +259,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
   }
 
+  private synchronized void notifyFailure(Throwable throwable) {
+    close();
+    LOGGER.error(throwable);
+  }
+
   /**
    * This method will be used to get the sorted row
    *
@@ -284,6 +309,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     try {
       poll.readRow();
     } catch (CarbonSortKeyAndGroupByException e) {
+      close();
       throw new CarbonDataWriterException(e.getMessage(), e);
     }
 
@@ -304,9 +330,18 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     return this.fileCounter > 0;
   }
 
-  public void clear() {
+  public void close() {
+    if (null != executorService && !executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
     if (null != recordHolderHeapLocal) {
-      recordHolderHeapLocal = null;
+      SortTempFileChunkHolder sortTempFileChunkHolder;
+      while (!recordHolderHeapLocal.isEmpty()) {
+        sortTempFileChunkHolder = recordHolderHeapLocal.poll();
+        if (null != sortTempFileChunkHolder) {
+          sortTempFileChunkHolder.closeStream();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index fc744a6..11df276 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
@@ -102,8 +103,9 @@ public class SortDataRows {
 
     // create new sort temp directory
     CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
-    this.dataSorterAndWriterExecutorService =
-        Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    this.dataSorterAndWriterExecutorService = Executors
+        .newFixedThreadPool(parameters.getNumberOfCores(),
+            new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName()));
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
@@ -128,9 +130,9 @@ public class SortDataRows {
         semaphore.acquire();
         dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
       } catch (InterruptedException e) {
-        LOGGER.error(
-            "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-        throw new CarbonSortKeyAndGroupByException(e.getMessage());
+        LOGGER.error(e,
+            "exception occurred while trying to acquire a semaphore lock: ");
+        throw new CarbonSortKeyAndGroupByException(e);
       }
       // create the new holder Array
       this.recordHolderList = new Object[this.sortBufferSize][];
@@ -379,14 +381,21 @@ public class SortDataRows {
      * @throws CarbonSortKeyAndGroupByException
      */
     public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
-      dataSorterAndWriterExecutorService.shutdownNow();
-      intermediateFileMerger.close();
+      close();
       parameters.getObserver().setFailed(true);
       LOGGER.error(exception);
       throw new CarbonSortKeyAndGroupByException(exception);
     }
   }
 
+  public void close() {
+    if (null != dataSorterAndWriterExecutorService && !dataSorterAndWriterExecutorService
+        .isShutdown()) {
+      dataSorterAndWriterExecutorService.shutdownNow();
+    }
+    intermediateFileMerger.close();
+  }
+
   /**
    * This class is responsible for sorting and writing the object
    * array which holds the records equal to given array size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index d234ce2..9c995a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -20,13 +20,16 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 /**
@@ -50,11 +53,15 @@ public class SortIntermediateFileMerger {
 
   private final Object lockObject = new Object();
 
+  private List<Future<Void>> mergerTask;
+
   public SortIntermediateFileMerger(SortParameters parameters) {
     this.parameters = parameters;
     // processed file list
     this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
+    this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
+        new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName()));
+    mergerTask = new ArrayList<>();
   }
 
   public void addFileToMerge(File sortTempFile) {
@@ -91,7 +98,7 @@ public class SortIntermediateFileMerger {
         chosenTempDir + File.separator + parameters.getTableName() + System
             .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
     IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.execute(merger);
+    mergerTask.add(executorService.submit(merger));
   }
 
   public void finish() throws CarbonSortKeyAndGroupByException {
@@ -103,10 +110,22 @@ public class SortIntermediateFileMerger {
     }
     procFiles.clear();
     procFiles = null;
+    checkForFailure();
+  }
+
+  private void checkForFailure() throws CarbonSortKeyAndGroupByException {
+    for (int i = 0; i < mergerTask.size(); i++) {
+      try {
+        mergerTask.get(i).get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonSortKeyAndGroupByException(e);
+      }
+    }
   }
 
   public void close() {
-    if (executorService.isShutdown()) {
+    if (!executorService.isShutdown()) {
       executorService.shutdownNow();
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index c4b0b31..3e56605 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
@@ -153,7 +154,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
       int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
-      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
+      boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn,
+      String tableName) {
     // set temp file
     this.tempFile = tempFile;
 
@@ -165,7 +167,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.noDictionaryCount = noDictionaryCount;
     // set mdkey length
     this.fileBufferSize = fileBufferSize;
-    this.executorService = Executors.newFixedThreadPool(1);
+    this.executorService = Executors
+        .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
     this.aggType = aggType;
 
     this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
@@ -407,7 +410,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   public void closeStream() {
     CarbonUtil.closeStreams(stream);
-    executorService.shutdown();
+    if (null != executorService) {
+      executorService.shutdownNow();
+    }
     this.backupBuffer = null;
     this.currentBuffer = null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2c275bf..78f1637 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
@@ -238,11 +239,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
 
     blockletProcessingCount = new AtomicInteger(0);
-    producerExecutorService = Executors.newFixedThreadPool(numberOfCores);
+    producerExecutorService = Executors.newFixedThreadPool(numberOfCores,
+        new CarbonThreadFactory("ProducerPool:" + model.getTableName()));
     producerExecutorServiceTaskList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     LOGGER.info("Initializing writer executors");
-    consumerExecutorService = Executors.newFixedThreadPool(1);
+    consumerExecutorService = Executors
+        .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()));
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
     tablePageList = new TablePageList();
@@ -357,12 +360,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   public void finish() throws CarbonDataWriterException {
     // still some data is present in stores if entryCount is more
     // than 0
+    if (null == dataWriter) {
+      return;
+    }
+    if (producerExecutorService.isShutdown()) {
+      return;
+    }
+    LOGGER.info("Started Finish Operation");
     try {
       semaphore.acquire();
       producerExecutorServiceTaskList.add(producerExecutorService
           .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
       blockletProcessingCount.incrementAndGet();
       processedDataCount += entryCount;
+      LOGGER.info("Total Number Of records added to store: " + processedDataCount);
       closeWriterExecutionService(producerExecutorService);
       processWriteTaskSubmitList(producerExecutorServiceTaskList);
       processingComplete = true;
@@ -666,6 +677,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       return tablePage;
     }
 
+    /**
+     * @param tablePage
+     * @param index
+     */
     public synchronized void put(TablePage tablePage, int index) {
       tablePages[index] = tablePage;
       // notify the consumer thread when index at which object is to be inserted

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index acb3b3b..972e414 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
@@ -174,7 +175,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
         blockSizeThreshold);
 
-    this.executorService = Executors.newFixedThreadPool(1);
+    this.executorService = Executors.newFixedThreadPool(1,
+        new CarbonThreadFactory("LocalToHDFSCopyPool:" + dataWriterVo.getTableName()));
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = dataWriterVo.getColCardinality();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 70a8703..d8ae8ff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -99,6 +99,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       buffer.flip();
       channel.write(buffer);
     } catch (IOException e) {
+      LOGGER.error(e, "Problem while writing the carbon file");
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
     }
   }
@@ -184,7 +185,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       }
       pageId = 0;
     } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem when writing file", e);
+      LOGGER.error(e, "Problem while writing file");
+      throw new CarbonDataWriterException("Problem while writing file", e);
     }
     // clear the data holder
     blockletDataHolder.clear();
@@ -213,6 +215,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
         measureStartIndex++;
       }
     } catch (IOException e) {
+      LOGGER.error(e, "Problem while getting the data chunks");
       throw new CarbonDataWriterException("Problem while getting the data chunks", e);
     }
     return size;
@@ -346,6 +349,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     try {
       writeIndexFile();
     } catch (IOException e) {
+      LOGGER.error(e, "Problem while writing the index file");
       throw new CarbonDataWriterException("Problem while writing the index file", e);
     }
     closeExecutorService();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 8681269..0b88684 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -67,6 +67,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -216,7 +217,8 @@ public final class CarbonLoaderUtil {
       throw new RuntimeException("Store location not set for the key " + tempLocationKey);
     }
     // submit local folder clean up in another thread so that main thread execution is not blocked
-    ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
+    ExecutorService localFolderDeletionService = Executors
+        .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
     try {
       localFolderDeletionService.submit(new Callable<Void>() {
         @Override public Void call() throws Exception {