You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:38 UTC
[carbondata] 18/41: [CARBONDATA-3304] Distinguish the thread names
created by thread pool of CarbonThreadFactory
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 271fd552fe9b870b08dae67c80ed039d8b0bc993
Author: qiuchenjian <80...@qq.com>
AuthorDate: Wed Feb 27 22:04:36 2019 +0800
[CARBONDATA-3304] Distinguish the thread names created by thread pool of CarbonThreadFactory
This closes #3137
---
.../impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java | 10 +++-------
.../org/apache/carbondata/core/memory/UnsafeMemoryManager.java | 3 +--
.../apache/carbondata/hadoop/api/CarbonTableOutputFormat.java | 3 ++-
.../processing/loading/TableProcessingOperations.java | 3 ++-
.../processing/loading/converter/impl/RowConverterImpl.java | 2 +-
.../loading/sort/impl/ParallelReadMergeSorterImpl.java | 3 ++-
.../loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java | 3 ++-
.../processing/loading/sort/unsafe/UnsafeSortDataRows.java | 9 ++++-----
.../sort/unsafe/merger/UnsafeIntermediateFileMerger.java | 2 +-
.../loading/sort/unsafe/merger/UnsafeIntermediateMerger.java | 5 +++--
.../loading/steps/CarbonRowDataWriterProcessorStepImpl.java | 2 +-
.../processing/loading/steps/DataWriterProcessorStepImpl.java | 2 +-
.../processing/loading/steps/InputProcessorStepImpl.java | 2 +-
.../carbondata/processing/sort/sortdata/SortDataRows.java | 3 ++-
.../processing/sort/sortdata/SortIntermediateFileMerger.java | 3 ++-
.../processing/sort/sortdata/SortTempFileChunkHolder.java | 3 ++-
.../processing/store/writer/AbstractFactDataWriter.java | 6 ++++--
17 files changed, 34 insertions(+), 30 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 0150179..ca1bfa7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -21,7 +21,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
@@ -74,12 +73,9 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
*/
public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
int numberOfRows, int dataLength) {
- try {
- // allocating the data page
- this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
- } catch (MemoryException e) {
- throw new RuntimeException(e);
- }
+ // allocating the data page
+ this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
+
this.dataLength = dataLength;
this.isExplicitSorted = isInvertedIdex;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index c59698f..f4c4f85 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -185,8 +185,7 @@ public class UnsafeMemoryManager {
/**
* It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
*/
- public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
- throws MemoryException {
+ public static MemoryBlock allocateMemoryWithRetry(String taskId, long size) {
return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size);
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 85fb315..9ba5e97 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -262,7 +262,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
DataTypeUtil.clearFormatter();
final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
final ExecutorService executorService = Executors.newFixedThreadPool(1,
- new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
+ new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(),
+ true));
// It should be started in new thread as the underlying iterator uses blocking queue.
Future future = executorService.submit(new Thread() {
@Override public void run() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index f08de59..d67979a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -126,7 +126,8 @@ public class TableProcessingOperations {
}
// submit local folder clean up in another thread so that main thread execution is not blocked
ExecutorService localFolderDeletionService = Executors
- .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
+ .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName,
+ true));
try {
localFolderDeletionService.submit(new Callable<Void>() {
@Override public Void call() throws Exception {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index df50e25..ac9413c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -121,7 +121,7 @@ public class RowConverterImpl implements RowConverter {
if (executorService == null) {
executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
"DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
- .getTableName()));
+ .getTableName(), true));
}
DictionaryOnePassService
.setDictionaryServiceProvider(configuration.getDictionaryServiceProvider());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 02d6309..61869c5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -94,7 +94,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length,
- new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName()));
+ new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName(),
+ true));
this.threadStatusObserver = new ThreadStatusObserver(executorService);
try {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 8af3ae2..aaa40e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -86,7 +86,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length,
- new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName()));
+ new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName(),
+ true));
this.threadStatusObserver = new ThreadStatusObserver(executorService);
try {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 87f97be..60dd7f1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -138,7 +138,8 @@ public class UnsafeSortDataRows {
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
this.dataSorterAndWriterExecutorService = Executors
.newFixedThreadPool(parameters.getNumberOfCores(),
- new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName()));
+ new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(),
+ true));
semaphore = new Semaphore(parameters.getNumberOfCores());
}
@@ -206,8 +207,7 @@ public class UnsafeSortDataRows {
}
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
- if (e.getMessage().contains("cannot handle this row. create new page"))
- {
+ if (e.getMessage().contains("cannot handle this row. create new page")) {
rowPage.makeCanAddFail();
// so that same rowBatch will be handled again in new page
i--;
@@ -243,8 +243,7 @@ public class UnsafeSortDataRows {
}
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
- if (e.getMessage().contains("cannot handle this row. create new page"))
- {
+ if (e.getMessage().contains("cannot handle this row. create new page")) {
rowPage.makeCanAddFail();
addRow(row);
} else {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 041544b..f7e38b3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -103,7 +103,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
}
double intermediateMergeCostTime =
(System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
- LOGGER.info("============================== Intermediate Merge of " + fileConterConst
+ LOGGER.info("Intermediate Merge of " + fileConterConst
+ " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error("Problem while intermediate merging", e);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index ea12263..1b44cc6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -75,7 +75,8 @@ public class UnsafeIntermediateMerger {
this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.mergedPages = new ArrayList<>();
this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
- new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
+ new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName(),
+ true));
this.procFiles = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.mergerTask = new ArrayList<>();
@@ -182,7 +183,7 @@ public class UnsafeIntermediateMerger {
* @param spillDisk whether to spill the merged result to disk
*/
private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows,
- boolean spillDisk) throws CarbonSortKeyAndGroupByException {
+ boolean spillDisk) {
UnsafeInMemoryIntermediateDataMerger merger =
new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, parameters, spillDisk);
mergedPages.add(merger);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 184248c..6345035 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -142,7 +142,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
} else {
executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
- .getCarbonTableIdentifier().getTableName()));
+ .getCarbonTableIdentifier().getTableName(), true));
Future[] futures = new Future[iterators.length];
for (int i = 0; i < iterators.length; i++) {
futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 7beca48..d1b1e76 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -115,7 +115,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
- new CarbonThreadFactory("WriterForwardPool: " + tableName));
+ new CarbonThreadFactory("WriterForwardPool: " + tableName, true));
List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length);
int i = 0;
// do this concurrently
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index f540b3e..c44c3f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -71,7 +71,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
rowParser = new RowParserImpl(getOutput(), configuration);
executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
"InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
- .getTableName()));
+ .getTableName(), true));
// if logger is enabled then raw data will be required.
this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration);
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 128547d..174d5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -112,7 +112,8 @@ public class SortDataRows {
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
this.dataSorterAndWriterExecutorService = Executors
.newFixedThreadPool(parameters.getNumberOfCores(),
- new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName()));
+ new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName(),
+ true));
semaphore = new Semaphore(parameters.getNumberOfCores());
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index 1f4f1e7..7079443 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -61,7 +61,8 @@ public class SortIntermediateFileMerger {
// processed file list
this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
- new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName()));
+ new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName(),
+ true));
mergerTask = new ArrayList<>();
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index eeea2ec..2ae90fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -124,7 +124,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
this.compressorName = sortParameters.getSortTempCompressorName();
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors
- .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
+ .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName,
+ true));
this.convertToActualField = convertToActualField;
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 472f143..eb1b15d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -186,7 +186,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
}
this.executorService = Executors.newFixedThreadPool(1,
- new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName()));
+ new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName(),
+ true));
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = this.model.getColCardinality();
@@ -208,7 +209,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
numberOfCores = model.getNumberOfCores() / 2;
}
fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory(
- "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId()));
+ "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
+ true));
}
}