You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/08/08 12:13:31 UTC

carbondata git commit: [CARBONDATA-2817]Thread Leak in Update and in No sort flow

Repository: carbondata
Updated Branches:
  refs/heads/master 8f7b594a3 -> 7158d5203


[CARBONDATA-2817]Thread Leak in Update and in No sort flow

Issue :- After Update Command is finished , Loading threads are not getting stopped.

Root Cause :-

In Update flow DataLoadExecutor 's close method is not called so all Executors services are not closed.
In Exceptions are not handled property in AFDW class's closeExecutorService() which is cuasing Thread leak if Job is killed from SparkUI..
Solution :-

Add Task Completion Listener and call close method of DataLoadExecutor to it .
Handle Exception in closeExecutor Service so that all Writer steps Threads can be closed.

This closes #2606


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7158d520
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7158d520
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7158d520

Branch: refs/heads/master
Commit: 7158d5203d84feaef23a5bb17a90b67c79ba52d0
Parents: 8f7b594
Author: BJangir <ba...@gmail.com>
Authored: Thu Aug 2 21:51:07 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Aug 8 17:42:04 2018 +0530

----------------------------------------------------------------------
 .../core/util/BlockletDataMapUtil.java          |  4 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  9 +++-
 .../CarbonRowDataWriterProcessorStepImpl.java   | 52 +++++++++++++++++---
 .../steps/DataWriterBatchProcessorStepImpl.java | 25 ++++++++--
 .../store/writer/AbstractFactDataWriter.java    | 16 ++++--
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 19 +++++--
 6 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 68ce1fb..404b426 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -115,7 +115,7 @@ public class BlockletDataMapUtil {
         CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo());
       }
       String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (null != fileNameToMetaInfoMapping && null == blockMetaInfoMap.get(blockPath)) {
+      if (null == blockMetaInfoMap.get(blockPath)) {
         BlockMetaInfo blockMetaInfo = createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath);
         // if blockMetaInfo is null that means the file has been deleted from the file system.
         // This can happen in case IUD scenarios where after deleting or updating the data the
@@ -123,8 +123,6 @@ public class BlockletDataMapUtil {
         if (null != blockMetaInfo) {
           blockMetaInfoMap.put(blockPath, blockMetaInfo);
         }
-      } else {
-        blockMetaInfoMap.put(blockPath, new BlockMetaInfo(new String[] {},0));
       }
     }
     return blockMetaInfoMap;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 2e7c307..f4fdbc1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -25,8 +25,10 @@ import org.apache.spark.sql.Row
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Data load in case of update command .
@@ -54,7 +56,12 @@ object UpdateDataLoad {
       loader.initialize()
 
       loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-      new DataLoadExecutor().execute(carbonLoadModel,
+      val executor = new DataLoadExecutor
+      TaskContext.get().addTaskCompletionListener { context =>
+        executor.close()
+        CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+      }
+      executor.execute(carbonLoadModel,
         loader.storeLocation,
         recordReaders.toArray)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 1a05b12..ac13d24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -80,11 +82,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
 
+  private List<CarbonFactHandler> carbonFactHandlers;
+
+  private ExecutorService executorService = null;
+
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
     this.localDictionaryGeneratorMap =
         CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
+    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   @Override public void initialize() throws IOException {
@@ -107,7 +114,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     final Iterator<CarbonRowBatch>[] iterators = child.execute();
     tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier();
     tableName = tableIdentifier.getTableName();
-    ExecutorService executorService = null;
     try {
       readCounter = new long[iterators.length];
       writeCounter = new long[iterators.length];
@@ -149,10 +155,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
         throw new BadRecordFoundException(e.getMessage(), e);
       }
       throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
-    } finally {
-      if (null != executorService && executorService.isShutdown()) {
-        executorService.shutdownNow();
-      }
     }
     return null;
   }
@@ -169,13 +171,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       if (rowsNotExist) {
         rowsNotExist = false;
         dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+        this.carbonFactHandlers.add(dataHandler);
         dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
     }
-    if (!rowsNotExist) {
-      finish(dataHandler, iteratorIndex);
+    try {
+      if (!rowsNotExist) {
+        finish(dataHandler, iteratorIndex);
+      }
+    } finally {
+      carbonFactHandlers.remove(dataHandler);
     }
+
+
   }
 
   @Override protected String getStepName() {
@@ -183,10 +192,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
   }
 
   private void finish(CarbonFactHandler dataHandler, int iteratorIndex) {
+    CarbonDataWriterException exception = null;
     try {
       dataHandler.finish();
     } catch (Exception e) {
+      // if throw exception from here dataHandler will not be closed.
+      // so just holding exception and later throwing exception
       LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+      exception = new CarbonDataWriterException(
+          "Failed for table: " + tableName + " in  finishing data handler", e);
     }
     LOGGER.info("Record Processed For table: " + tableName);
     String logMessage =
@@ -194,13 +208,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
             + ": Write: " + readCounter[iteratorIndex];
     LOGGER.info(logMessage);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
-    processingComplete(dataHandler);
+    try {
+      processingComplete(dataHandler);
+    } catch (CarbonDataLoadingException e) {
+      exception = new CarbonDataWriterException(e.getMessage(), e);
+    }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
+    if (null != exception) {
+      throw exception;
+    }
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
@@ -310,4 +331,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       }
     }
   }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      if (null != executorService) {
+        executorService.shutdownNow();
+      }
+      if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
+        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
+          carbonFactHandler.finish();
+          carbonFactHandler.closeHandler();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 5663811..26ae2d7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -98,8 +99,14 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
                 .createCarbonFactHandler(model);
             carbonFactHandler.initialise();
             processBatch(next, carbonFactHandler);
-            finish(tableName, carbonFactHandler);
-            this.carbonFactHandler = null;
+            try {
+              finish(tableName, carbonFactHandler);
+            } finally {
+              // we need to make carbonFactHandler =null as finish will call closehandler
+              // even finish throws exception
+              // otherwise close() will call finish method again for same handler.
+              this.carbonFactHandler = null;
+            }
           }
         }
         i++;
@@ -119,19 +126,31 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
   }
 
   private void finish(String tableName, CarbonFactHandler dataHandler) {
+    CarbonDataWriterException exception = null;
     try {
       dataHandler.finish();
     } catch (Exception e) {
+      // if throw exception from here dataHandler will not be closed.
+      // so just holding exception and later throwing exception
       LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+      exception = new CarbonDataWriterException(
+          "Failed for table: " + tableName + " in  finishing data handler", e);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
-    processingComplete(dataHandler);
+    try {
+      processingComplete(dataHandler);
+    } catch (Exception e) {
+      exception = new CarbonDataWriterException(e.getMessage(), e);
+    }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
+    if (null != exception) {
+      throw exception;
+    }
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 3e71e45..836e2c8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -415,20 +415,30 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   protected void closeExecutorService() throws CarbonDataWriterException {
+    CarbonDataWriterException exception = null;
     try {
       listener.finish();
+      listener = null;
+    } catch (IOException e) {
+      exception = new CarbonDataWriterException(e);
+    }
+    try {
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.HOURS);
       for (int i = 0; i < executorServiceSubmitList.size(); i++) {
         executorServiceSubmitList.get(i).get();
       }
-      listener = null;
-    } catch (InterruptedException | ExecutionException | IOException e) {
-      throw new CarbonDataWriterException(e);
+    } catch (InterruptedException | ExecutionException e) {
+      if (null == exception) {
+        exception = new CarbonDataWriterException(e);
+      }
     }
     if (null != fallbackExecutorService) {
       fallbackExecutorService.shutdownNow();
     }
+    if (exception != null) {
+      throw exception;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7158d520/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index dc6e443..f992e44 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -350,14 +350,25 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    commitCurrentFile(true);
+    CarbonDataWriterException exception = null;
     try {
+      commitCurrentFile(true);
       writeIndexFile();
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOGGER.error(e, "Problem while writing the index file");
-      throw new CarbonDataWriterException("Problem while writing the index file", e);
+      exception = new CarbonDataWriterException("Problem while writing the index file", e);
+    } finally {
+      try {
+        closeExecutorService();
+      } catch (CarbonDataWriterException e) {
+        if (null == exception) {
+          exception = e;
+        }
+      }
+    }
+    if (null != exception) {
+      throw exception;
     }
-    closeExecutorService();
   }
 
   @Override public void writeFooterToFile() throws CarbonDataWriterException {