You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2019/03/13 06:27:11 UTC

[carbondata] branch master updated: [CARBONDATA-3304] Distinguish the thread names created by thread pool of CarbonThreadFactory

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c55a5d0  [CARBONDATA-3304] Distinguish the thread names created by thread pool of CarbonThreadFactory
c55a5d0 is described below

commit c55a5d0b5bb6b7f41712d3526b1a65c6079895be
Author: qiuchenjian <80...@qq.com>
AuthorDate: Wed Feb 27 22:04:36 2019 +0800

    [CARBONDATA-3304] Distinguish the thread names created by thread pool of CarbonThreadFactory
    
    This closes #3137
---
 .../impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java     | 10 +++-------
 .../org/apache/carbondata/core/memory/UnsafeMemoryManager.java |  3 +--
 .../apache/carbondata/hadoop/api/CarbonTableOutputFormat.java  |  3 ++-
 .../processing/loading/TableProcessingOperations.java          |  3 ++-
 .../processing/loading/converter/impl/RowConverterImpl.java    |  2 +-
 .../loading/sort/impl/ParallelReadMergeSorterImpl.java         |  3 ++-
 .../loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java   |  3 ++-
 .../processing/loading/sort/unsafe/UnsafeSortDataRows.java     |  9 ++++-----
 .../sort/unsafe/merger/UnsafeIntermediateFileMerger.java       |  2 +-
 .../loading/sort/unsafe/merger/UnsafeIntermediateMerger.java   |  5 +++--
 .../loading/steps/CarbonRowDataWriterProcessorStepImpl.java    |  2 +-
 .../processing/loading/steps/DataWriterProcessorStepImpl.java  |  2 +-
 .../processing/loading/steps/InputProcessorStepImpl.java       |  2 +-
 .../carbondata/processing/sort/sortdata/SortDataRows.java      |  3 ++-
 .../processing/sort/sortdata/SortIntermediateFileMerger.java   |  3 ++-
 .../processing/sort/sortdata/SortTempFileChunkHolder.java      |  3 ++-
 .../processing/store/writer/AbstractFactDataWriter.java        |  6 ++++--
 17 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 0150179..ca1bfa7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -21,7 +21,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
@@ -74,12 +73,9 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
    */
   public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
       int numberOfRows, int dataLength) {
-    try {
-      // allocating the data page
-      this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
-    } catch (MemoryException e) {
-      throw new RuntimeException(e);
-    }
+    // allocating the data page
+    this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
+
     this.dataLength = dataLength;
     this.isExplicitSorted = isInvertedIdex;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index c59698f..f4c4f85 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -185,8 +185,7 @@ public class UnsafeMemoryManager {
   /**
    * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
    */
-  public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
-      throws MemoryException {
+  public static MemoryBlock allocateMemoryWithRetry(String taskId, long size) {
     return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size);
   }
 
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 85fb315..9ba5e97 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -262,7 +262,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     DataTypeUtil.clearFormatter();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
     final ExecutorService executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
+        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(),
+                true));
     // It should be started in new thread as the underlying iterator uses blocking queue.
     Future future = executorService.submit(new Thread() {
       @Override public void run() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index f08de59..d67979a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -126,7 +126,8 @@ public class TableProcessingOperations {
     }
     // submit local folder clean up in another thread so that main thread execution is not blocked
     ExecutorService localFolderDeletionService = Executors
-        .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
+        .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName,
+                true));
     try {
       localFolderDeletionService.submit(new Callable<Void>() {
         @Override public Void call() throws Exception {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index df50e25..ac9413c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -121,7 +121,7 @@ public class RowConverterImpl implements RowConverter {
       if (executorService == null) {
         executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
             "DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
-                .getTableName()));
+                .getTableName(), true));
       }
       DictionaryOnePassService
           .setDictionaryServiceProvider(configuration.getDictionaryServiceProvider());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 02d6309..61869c5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -94,7 +94,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length,
-        new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName()));
+        new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName(),
+                true));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 8af3ae2..aaa40e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -86,7 +86,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length,
-        new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName()));
+        new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName(),
+                true));
     this.threadStatusObserver = new ThreadStatusObserver(executorService);
 
     try {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 87f97be..60dd7f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -138,7 +138,8 @@ public class UnsafeSortDataRows {
     CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
     this.dataSorterAndWriterExecutorService = Executors
         .newFixedThreadPool(parameters.getNumberOfCores(),
-            new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName()));
+            new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(),
+                    true));
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
@@ -206,8 +207,7 @@ public class UnsafeSortDataRows {
         }
         bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
       } catch (Exception e) {
-        if (e.getMessage().contains("cannot handle this row. create new page"))
-        {
+        if (e.getMessage().contains("cannot handle this row. create new page")) {
           rowPage.makeCanAddFail();
           // so that same rowBatch will be handled again in new page
           i--;
@@ -243,8 +243,7 @@ public class UnsafeSortDataRows {
       }
       rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
     } catch (Exception e) {
-      if (e.getMessage().contains("cannot handle this row. create new page"))
-      {
+      if (e.getMessage().contains("cannot handle this row. create new page")) {
         rowPage.makeCanAddFail();
         addRow(row);
       } else {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 041544b..f7e38b3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -103,7 +103,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
       }
       double intermediateMergeCostTime =
           (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
-      LOGGER.info("============================== Intermediate Merge of " + fileConterConst
+      LOGGER.info("Intermediate Merge of " + fileConterConst
           + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
     } catch (Exception e) {
       LOGGER.error("Problem while intermediate merging", e);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index ea12263..1b44cc6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -75,7 +75,8 @@ public class UnsafeIntermediateMerger {
     this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     this.mergedPages = new ArrayList<>();
     this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
-        new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
+        new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName(),
+                true));
     this.procFiles = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     this.mergerTask = new ArrayList<>();
 
@@ -182,7 +183,7 @@ public class UnsafeIntermediateMerger {
    * @param spillDisk whether to spill the merged result to disk
    */
   private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows,
-      boolean spillDisk) throws CarbonSortKeyAndGroupByException {
+      boolean spillDisk) {
     UnsafeInMemoryIntermediateDataMerger merger =
         new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, parameters, spillDisk);
     mergedPages.add(merger);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 184248c..6345035 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -142,7 +142,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
-                .getCarbonTableIdentifier().getTableName()));
+                .getCarbonTableIdentifier().getTableName(), true));
         Future[] futures = new Future[iterators.length];
         for (int i = 0; i < iterators.length; i++) {
           futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 7beca48..d1b1e76 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -115,7 +115,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
       rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
-          new CarbonThreadFactory("WriterForwardPool: " + tableName));
+          new CarbonThreadFactory("WriterForwardPool: " + tableName, true));
       List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length);
       int i = 0;
       // do this concurrently
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index f540b3e..c44c3f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -71,7 +71,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     rowParser = new RowParserImpl(getOutput(), configuration);
     executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
         "InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
-            .getTableName()));
+            .getTableName(), true));
     // if logger is enabled then raw data will be required.
     this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration);
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 128547d..174d5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -112,7 +112,8 @@ public class SortDataRows {
     CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
     this.dataSorterAndWriterExecutorService = Executors
         .newFixedThreadPool(parameters.getNumberOfCores(),
-            new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName()));
+            new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName(),
+                    true));
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index 1f4f1e7..7079443 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -61,7 +61,8 @@ public class SortIntermediateFileMerger {
     // processed file list
     this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
-        new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName()));
+        new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName(),
+                true));
     mergerTask = new ArrayList<>();
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index eeea2ec..2ae90fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -124,7 +124,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.compressorName = sortParameters.getSortTempCompressorName();
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors
-        .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
+        .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName,
+                true));
     this.convertToActualField = convertToActualField;
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 472f143..eb1b15d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -186,7 +186,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     }
 
     this.executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName()));
+        new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName(),
+                true));
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = this.model.getColCardinality();
@@ -208,7 +209,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         numberOfCores = model.getNumberOfCores() / 2;
       }
       fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory(
-          "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId()));
+          "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
+              true));
     }
   }