You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/05 11:09:30 UTC
carbondata git commit: [CARBONDATA-1410] Fixed thread leak issue in
case of data loading
Repository: carbondata
Updated Branches:
refs/heads/master 7d3bf0503 -> a734add5a
[CARBONDATA-1410] Fixed thread leak issue in case of data loading
Problem: In case of data loading failure threads are not getting closed and its causing thread leak in long run, because of this OOM is coming
Solution: Close all the thread in case of failure , Success and killing
This closes #1401
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a734add5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a734add5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a734add5
Branch: refs/heads/master
Commit: a734add5a95790f207d21a2e0dcc4e1480d51932
Parents: 7d3bf05
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Sep 25 18:24:03 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Oct 5 16:38:48 2017 +0530
----------------------------------------------------------------------
.../exception/CarbonDataWriterException.java | 9 +++
.../core/util/CarbonThreadFactory.java | 47 ++++++++++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 9 ++-
.../converter/impl/RowConverterImpl.java | 5 +-
.../sort/impl/ParallelReadMergeSorterImpl.java | 9 ++-
.../impl/UnsafeParallelReadMergeSorterImpl.java | 9 ++-
.../loading/sort/unsafe/UnsafeSortDataRows.java | 11 ++-
.../holder/UnsafeSortTempFileChunkHolder.java | 4 +-
.../merger/UnsafeIntermediateFileMerger.java | 40 ++++++----
.../unsafe/merger/UnsafeIntermediateMerger.java | 31 +++++---
.../UnsafeSingleThreadFinalSortFilesMerger.java | 3 +-
.../loading/steps/InputProcessorStepImpl.java | 5 +-
.../sort/sortdata/IntermediateFileMerger.java | 43 +++++++----
.../SingleThreadFinalSortFilesMerger.java | 81 ++++++++++++++------
.../processing/sort/sortdata/SortDataRows.java | 23 ++++--
.../sortdata/SortIntermediateFileMerger.java | 25 +++++-
.../sort/sortdata/SortTempFileChunkHolder.java | 11 ++-
.../store/CarbonFactDataHandlerColumnar.java | 19 ++++-
.../store/writer/AbstractFactDataWriter.java | 4 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 6 +-
.../processing/util/CarbonLoaderUtil.java | 4 +-
21 files changed, 308 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
index 8f75ee8..f9f44a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
@@ -44,6 +44,15 @@ public class CarbonDataWriterException extends RuntimeException {
/**
* Constructor
*
+ * @param t exception.
+ */
+ public CarbonDataWriterException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Constructor
+ *
* @param msg The error message for this exception.
*/
public CarbonDataWriterException(String msg, Throwable t) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
new file mode 100644
index 0000000..689365d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonThreadFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Carbon thread factory class
+ */
+public class CarbonThreadFactory implements ThreadFactory {
+
+ /**
+ * default thread factory
+ */
+ private ThreadFactory defaultFactory;
+
+ /**
+ * pool name
+ */
+ private String name;
+
+ public CarbonThreadFactory(String name) {
+ this.defaultFactory = Executors.defaultThreadFactory();
+ this.name = name;
+ }
+
+ @Override public Thread newThread(Runnable r) {
+ final Thread thread = defaultFactory.newThread(r);
+ thread.setName(name);
+ return thread;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1d1b47a..49b708c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -299,8 +299,9 @@ class NewCarbonDataLoadRDD[K, V](
split.serializableHadoopSplit.value.getPartition.getUniqueID)
}
partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
- StandardLogService.setThreadName(partitionID, null)
+ StandardLogService.setThreadName(StandardLogService
+ .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+ , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
partitionID, split.partitionBlocksDetail.length)
val readers =
@@ -328,7 +329,9 @@ class NewCarbonDataLoadRDD[K, V](
} else {
model = carbonLoadModel.getCopyWithPartition(partitionID)
}
- StandardLogService.setThreadName(blocksID, null)
+ StandardLogService.setThreadName(StandardLogService
+ .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+ , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
val readers =
split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
readers.zipWithIndex.map { case (reader, index) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index a4351ae..79c6d61 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.BadRecordsLogger;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
@@ -117,7 +118,9 @@ public class RowConverterImpl implements RowConverter {
// for one pass load, start the dictionary client
if (configuration.getUseOnePass()) {
if (executorService == null) {
- executorService = Executors.newCachedThreadPool();
+ executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
+ "DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
+ .getTableName()));
}
Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 6e43fcb..cefc97d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -59,6 +60,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
private AtomicLong rowCounter;
+ private ExecutorService executorService;
+
public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
this.rowCounter = rowCounter;
}
@@ -94,7 +97,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
}
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.executorService = Executors.newFixedThreadPool(iterators.length,
+ new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName()));
this.threadStatusObserver = new ThreadStatusObserver(executorService);
try {
@@ -147,6 +151,9 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
if (intermediateFileMerger != null) {
intermediateFileMerger.close();
}
+ if (null != executorService && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 1a2f704..c05c027 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -59,6 +60,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
private AtomicLong rowCounter;
+ private ExecutorService executorService;
+
public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
this.rowCounter = rowCounter;
}
@@ -82,7 +85,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
} catch (MemoryException e) {
throw new CarbonDataLoadingException(e);
}
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.executorService = Executors.newFixedThreadPool(iterators.length,
+ new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName()));
this.threadStatusObserver = new ThreadStatusObserver(executorService);
try {
@@ -131,6 +135,9 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
}
@Override public void close() {
+ if (null != executorService && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
unsafeIntermediateFileMerger.close();
finalMerger.clear();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 88b72aa..0210464 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
@@ -137,8 +138,9 @@ public class UnsafeSortDataRows {
// create new sort temp directory
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
- this.dataSorterAndWriterExecutorService =
- Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ this.dataSorterAndWriterExecutorService = Executors
+ .newFixedThreadPool(parameters.getNumberOfCores(),
+ new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName()));
semaphore = new Semaphore(parameters.getNumberOfCores());
}
@@ -372,7 +374,8 @@ public class UnsafeSortDataRows {
+ System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
writeData(page, sortTempFile);
LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
- + " and write is: " + (System.currentTimeMillis() - startTime));
+ + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
+ + sortTempFile);
page.freeMemory();
// add sort temp filename to and arrayList. When the list size reaches 20 then
// intermediate merging of sort temp files will be triggered
@@ -395,7 +398,7 @@ public class UnsafeSortDataRows {
page.getBuffer().loadToUnsafe();
unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page);
LOGGER.info(
- "Time taken to sort row page with size" + page.getBuffer().getActualSize() + "is: "
+ "Time taken to sort row page with size: " + page.getBuffer().getActualSize() + "is: "
+ (System.currentTimeMillis() - startTime));
}
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 331b9db..5fed2ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -383,7 +383,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
*/
public void close() {
CarbonUtil.closeStreams(stream);
- executorService.shutdown();
+ if (null != executorService && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7f98d72..4303ec8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriter;
import org.apache.carbondata.processing.sort.sortdata.TempSortFileWriterFactory;
-public class UnsafeIntermediateFileMerger implements Runnable {
+public class UnsafeIntermediateFileMerger implements Callable<Void> {
/**
* LOGGER
*/
@@ -84,6 +85,8 @@ public class UnsafeIntermediateFileMerger implements Runnable {
private ByteBuffer rowData;
+ private Throwable throwable;
+
/**
* IntermediateFileMerger Constructor
*/
@@ -99,11 +102,9 @@ public class UnsafeIntermediateFileMerger implements Runnable {
rowData = ByteBuffer.allocate(2 * 1024 * 1024);
}
- @Override
- public void run() {
+ @Override public Void call() throws Exception {
long intermediateMergeStartTime = System.currentTimeMillis();
int fileConterConst = fileCounter;
- boolean isFailed = false;
try {
startSorting();
initialize();
@@ -116,24 +117,30 @@ public class UnsafeIntermediateFileMerger implements Runnable {
+ " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error(e, "Problem while intermediate merging");
- isFailed = true;
+ clear();
+ throwable = e;
} finally {
CarbonUtil.closeStreams(this.stream);
if (null != writer) {
writer.finish();
}
- if (!isFailed) {
+ if (null == throwable) {
try {
finish();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e, "Problem while deleting the merge file");
+ throwable = e;
}
} else {
- if (outPutFile.delete()) {
+ if (!outPutFile.delete()) {
LOGGER.error("Problem while deleting the merge file");
}
}
}
+ if (null != throwable) {
+ throw new CarbonSortKeyAndGroupByException(throwable);
+ }
+ return null;
}
/**
@@ -351,12 +358,7 @@ public class UnsafeIntermediateFileMerger implements Runnable {
}
private void finish() throws CarbonSortKeyAndGroupByException {
- if (recordHolderHeap != null) {
- int size = recordHolderHeap.size();
- for (int i = 0; i < size; i++) {
- recordHolderHeap.poll().close();
- }
- }
+ clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
rowData.clear();
@@ -364,4 +366,16 @@ public class UnsafeIntermediateFileMerger implements Runnable {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
}
+
+ private void clear() {
+ if (null != recordHolderHeap) {
+ SortTempChunkHolder sortTempChunkHolder;
+ while (!recordHolderHeap.isEmpty()) {
+ sortTempChunkHolder = recordHolderHeap.poll();
+ if (null != sortTempChunkHolder) {
+ sortTempChunkHolder.close();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index c774d8f..0d24e01 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -20,14 +20,16 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
@@ -55,20 +57,19 @@ public class UnsafeIntermediateMerger {
private final Object lockObject = new Object();
- private boolean offHeap;
-
private List<File> procFiles;
+ private List<Future<Void>> mergerTask;
+
public UnsafeIntermediateMerger(SortParameters parameters) {
this.parameters = parameters;
// processed file list
this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.mergedPages = new ArrayList<>();
- this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
- this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
- CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+ this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
+ new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ this.mergerTask = new ArrayList<>();
}
public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) {
@@ -116,7 +117,7 @@ public class UnsafeIntermediateMerger {
.nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
UnsafeIntermediateFileMerger merger =
new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
- executorService.execute(merger);
+ mergerTask.add(executorService.submit(merger));
}
public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
@@ -167,16 +168,28 @@ public class UnsafeIntermediateMerger {
} catch (InterruptedException e) {
throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e);
}
+ checkForFailure();
}
public void close() {
- if (executorService.isShutdown()) {
+ if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
rowPages.clear();
rowPages = null;
}
+ private void checkForFailure() throws CarbonSortKeyAndGroupByException {
+ for (int i = 0; i < mergerTask.size(); i++) {
+ try {
+ mergerTask.get(i).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e.getMessage(), e);
+ }
+ }
+ }
+
public List<UnsafeCarbonRowPage> getRowPages() {
return rowPages;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 32b31d7..eb38efe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -97,6 +97,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
LOGGER.info("No files to merge sort");
return;
}
+ LOGGER.info("Starting final merger");
LOGGER.info("Number of row pages: " + this.fileCounter);
// create record holder heap
@@ -143,7 +144,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
} catch (Exception e) {
LOGGER.error(e);
- throw new CarbonDataWriterException(e.getMessage());
+ throw new CarbonDataWriterException(e);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 70a1254..ae7ece1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
@@ -64,7 +65,9 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
@Override public void initialize() throws IOException {
super.initialize();
rowParser = new RowParserImpl(getOutput(), configuration);
- executorService = Executors.newCachedThreadPool();
+ executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
+ "InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
+ .getTableName()));
}
@Override public Iterator<CarbonRowBatch>[] execute() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index ffe6fb6..d4a8dd6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.AbstractQueue;
import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -33,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-public class IntermediateFileMerger implements Runnable {
+public class IntermediateFileMerger implements Callable<Void> {
/**
* LOGGER
*/
@@ -88,6 +89,8 @@ public class IntermediateFileMerger implements Runnable {
private boolean[] noDictionarycolumnMapping;
+ private Throwable throwable;
+
/**
* IntermediateFileMerger Constructor
*/
@@ -100,11 +103,9 @@ public class IntermediateFileMerger implements Runnable {
noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
}
- @Override
- public void run() {
+ @Override public Void call() throws Exception {
long intermediateMergeStartTime = System.currentTimeMillis();
int fileConterConst = fileCounter;
- boolean isFailed = false;
try {
startSorting();
initialize();
@@ -129,25 +130,31 @@ public class IntermediateFileMerger implements Runnable {
" Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error(e, "Problem while intermediate merging");
- isFailed = true;
+ clear();
+ throwable = e;
} finally {
records = null;
CarbonUtil.closeStreams(this.stream);
if (null != writer) {
writer.finish();
}
- if (!isFailed) {
+ if (null == throwable) {
try {
finish();
} catch (CarbonSortKeyAndGroupByException e) {
LOGGER.error(e, "Problem while deleting the merge file");
+ throwable = e;
}
} else {
- if (outPutFile.delete()) {
+ if (!outPutFile.delete()) {
LOGGER.error("Problem while deleting the merge file");
}
}
}
+ if (null != throwable) {
+ throw new CarbonSortKeyAndGroupByException(throwable);
+ }
+ return null;
}
/**
@@ -251,7 +258,7 @@ public class IntermediateFileMerger implements Runnable {
mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
mergerParameters.getMeasureDataType(),
mergerParameters.getNoDictionaryDimnesionColumn(),
- mergerParameters.getNoDictionarySortColumn());
+ mergerParameters.getNoDictionarySortColumn(), mergerParameters.getTableName());
// initialize
sortTempFileChunkHolder.initialize();
@@ -370,16 +377,24 @@ public class IntermediateFileMerger implements Runnable {
}
private void finish() throws CarbonSortKeyAndGroupByException {
- if (recordHolderHeap != null) {
- int size = recordHolderHeap.size();
- for (int i = 0; i < size; i++) {
- recordHolderHeap.poll().closeStream();
- }
- }
+ clear();
try {
CarbonUtil.deleteFiles(intermediateFiles);
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
}
}
+
+ private void clear() {
+ if (recordHolderHeap != null) {
+ SortTempFileChunkHolder sortTempFileChunkHolder;
+ while (!recordHolderHeap.isEmpty()) {
+ sortTempFileChunkHolder = recordHolderHeap.poll();
+ if (null != sortTempFileChunkHolder) {
+ sortTempFileChunkHolder.closeStream();
+ }
+ }
+ }
+ recordHolderHeap = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 6d6ff94..db4c771 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.CarbonIterator;
@@ -105,6 +108,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
private boolean[] isNoDictionarySortColumn;
+ private int maxThreadForSorting;
+
+ private ExecutorService executorService;
+
+ private List<Future<Void>> mergerTask;
+
public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName,
int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
@@ -117,6 +126,15 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
this.noDictionaryCount = noDictionaryCount;
this.isNoDictionaryColumn = isNoDictionaryColumn;
this.isNoDictionarySortColumn = isNoDictionarySortColumn;
+ try {
+ maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
+ CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
+ } catch (NumberFormatException e) {
+ maxThreadForSorting =
+ Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
+ }
+ this.mergerTask = new ArrayList<>();
}
/**
@@ -174,6 +192,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
.getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ LOGGER.info("Started Final Merge");
+
LOGGER.info("Number of temp file: " + this.fileCounter);
LOGGER.info("File Buffer Size: " + this.fileBufferSize);
@@ -183,51 +203,51 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
// iterate over file list and create chunk holder and add to heap
LOGGER.info("Started adding first record from each file");
- int maxThreadForSorting = 0;
- try {
- maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
- CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE));
- } catch (NumberFormatException e) {
- maxThreadForSorting =
- Integer.parseInt(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE);
- }
- ExecutorService service = Executors.newFixedThreadPool(maxThreadForSorting);
+ this.executorService = Executors.newFixedThreadPool(maxThreadForSorting);
for (final File tempFile : files) {
- Runnable runnable = new Runnable() {
- @Override public void run() {
-
+ Callable<Void> callable = new Callable<Void>() {
+ @Override public Void call() throws CarbonSortKeyAndGroupByException {
// create chunk holder
SortTempFileChunkHolder sortTempFileChunkHolder =
new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
measureCount, fileBufferSize, noDictionaryCount, measureDataType,
- isNoDictionaryColumn, isNoDictionarySortColumn);
+ isNoDictionaryColumn, isNoDictionarySortColumn, tableName);
try {
// initialize
sortTempFileChunkHolder.initialize();
sortTempFileChunkHolder.readRow();
} catch (CarbonSortKeyAndGroupByException ex) {
- LOGGER.error(ex);
+ sortTempFileChunkHolder.closeStream();
+ notifyFailure(ex);
}
-
synchronized (LOCKOBJECT) {
recordHolderHeapLocal.add(sortTempFileChunkHolder);
}
+ return null;
}
};
- service.execute(runnable);
+ mergerTask.add(executorService.submit(callable));
}
- service.shutdown();
-
+ executorService.shutdown();
try {
- service.awaitTermination(2, TimeUnit.HOURS);
+ executorService.awaitTermination(2, TimeUnit.HOURS);
} catch (Exception e) {
throw new CarbonDataWriterException(e.getMessage(), e);
}
+ checkFailure();
+ LOGGER.info("final merger Heap Size" + this.recordHolderHeapLocal.size());
+ }
- LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+ private void checkFailure() {
+ for (int i = 0; i < mergerTask.size(); i++) {
+ try {
+ mergerTask.get(i).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CarbonDataWriterException(e);
+ }
+ }
}
/**
@@ -239,6 +259,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
}
+ private synchronized void notifyFailure(Throwable throwable) {
+ close();
+ LOGGER.error(throwable);
+ }
+
/**
* This method will be used to get the sorted row
*
@@ -284,6 +309,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
try {
poll.readRow();
} catch (CarbonSortKeyAndGroupByException e) {
+ close();
throw new CarbonDataWriterException(e.getMessage(), e);
}
@@ -304,9 +330,18 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
return this.fileCounter > 0;
}
- public void clear() {
+ public void close() {
+ if (null != executorService && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
if (null != recordHolderHeapLocal) {
- recordHolderHeapLocal = null;
+ SortTempFileChunkHolder sortTempFileChunkHolder;
+ while (!recordHolderHeapLocal.isEmpty()) {
+ sortTempFileChunkHolder = recordHolderHeapLocal.poll();
+ if (null != sortTempFileChunkHolder) {
+ sortTempFileChunkHolder.closeStream();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index fc744a6..11df276 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
@@ -102,8 +103,9 @@ public class SortDataRows {
// create new sort temp directory
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
- this.dataSorterAndWriterExecutorService =
- Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ this.dataSorterAndWriterExecutorService = Executors
+ .newFixedThreadPool(parameters.getNumberOfCores(),
+ new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName()));
semaphore = new Semaphore(parameters.getNumberOfCores());
}
@@ -128,9 +130,9 @@ public class SortDataRows {
semaphore.acquire();
dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
} catch (InterruptedException e) {
- LOGGER.error(
- "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
- throw new CarbonSortKeyAndGroupByException(e.getMessage());
+ LOGGER.error(e,
+ "exception occurred while trying to acquire a semaphore lock: ");
+ throw new CarbonSortKeyAndGroupByException(e);
}
// create the new holder Array
this.recordHolderList = new Object[this.sortBufferSize][];
@@ -379,14 +381,21 @@ public class SortDataRows {
* @throws CarbonSortKeyAndGroupByException
*/
public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException {
- dataSorterAndWriterExecutorService.shutdownNow();
- intermediateFileMerger.close();
+ close();
parameters.getObserver().setFailed(true);
LOGGER.error(exception);
throw new CarbonSortKeyAndGroupByException(exception);
}
}
+ public void close() {
+ if (null != dataSorterAndWriterExecutorService && !dataSorterAndWriterExecutorService
+ .isShutdown()) {
+ dataSorterAndWriterExecutorService.shutdownNow();
+ }
+ intermediateFileMerger.close();
+ }
+
/**
* This class is responsible for sorting and writing the object
* array which holds the records equal to given array size
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index d234ce2..9c995a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -20,13 +20,16 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
/**
@@ -50,11 +53,15 @@ public class SortIntermediateFileMerger {
private final Object lockObject = new Object();
+ private List<Future<Void>> mergerTask;
+
public SortIntermediateFileMerger(SortParameters parameters) {
this.parameters = parameters;
// processed file list
this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores());
+ this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
+ new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName()));
+ mergerTask = new ArrayList<>();
}
public void addFileToMerge(File sortTempFile) {
@@ -91,7 +98,7 @@ public class SortIntermediateFileMerger {
chosenTempDir + File.separator + parameters.getTableName() + System
.nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
- executorService.execute(merger);
+ mergerTask.add(executorService.submit(merger));
}
public void finish() throws CarbonSortKeyAndGroupByException {
@@ -103,10 +110,22 @@ public class SortIntermediateFileMerger {
}
procFiles.clear();
procFiles = null;
+ checkForFailure();
+ }
+
+ private void checkForFailure() throws CarbonSortKeyAndGroupByException {
+ for (int i = 0; i < mergerTask.size(); i++) {
+ try {
+ mergerTask.get(i).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ }
}
public void close() {
- if (executorService.isShutdown()) {
+ if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index c4b0b31..3e56605 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
@@ -153,7 +154,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
*/
public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
- boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
+ boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn,
+ String tableName) {
// set temp file
this.tempFile = tempFile;
@@ -165,7 +167,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
this.noDictionaryCount = noDictionaryCount;
// set mdkey length
this.fileBufferSize = fileBufferSize;
- this.executorService = Executors.newFixedThreadPool(1);
+ this.executorService = Executors
+ .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
this.aggType = aggType;
this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
@@ -407,7 +410,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
*/
public void closeStream() {
CarbonUtil.closeStreams(stream);
- executorService.shutdown();
+ if (null != executorService) {
+ executorService.shutdownNow();
+ }
this.backupBuffer = null;
this.currentBuffer = null;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2c275bf..78f1637 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
@@ -238,11 +239,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
blockletProcessingCount = new AtomicInteger(0);
- producerExecutorService = Executors.newFixedThreadPool(numberOfCores);
+ producerExecutorService = Executors.newFixedThreadPool(numberOfCores,
+ new CarbonThreadFactory("ProducerPool:" + model.getTableName()));
producerExecutorServiceTaskList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
LOGGER.info("Initializing writer executors");
- consumerExecutorService = Executors.newFixedThreadPool(1);
+ consumerExecutorService = Executors
+ .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()));
consumerExecutorServiceTaskList = new ArrayList<>(1);
semaphore = new Semaphore(numberOfCores);
tablePageList = new TablePageList();
@@ -357,12 +360,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public void finish() throws CarbonDataWriterException {
// still some data is present in stores if entryCount is more
// than 0
+ if (null == dataWriter) {
+ return;
+ }
+ if (producerExecutorService.isShutdown()) {
+ return;
+ }
+ LOGGER.info("Started Finish Operation");
try {
semaphore.acquire();
producerExecutorServiceTaskList.add(producerExecutorService
.submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true)));
blockletProcessingCount.incrementAndGet();
processedDataCount += entryCount;
+ LOGGER.info("Total Number Of records added to store: " + processedDataCount);
closeWriterExecutionService(producerExecutorService);
processWriteTaskSubmitList(producerExecutorServiceTaskList);
processingComplete = true;
@@ -666,6 +677,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return tablePage;
}
+ /**
+ * @param tablePage
+ * @param index
+ */
public synchronized void put(TablePage tablePage, int index) {
tablePages[index] = tablePage;
// notify the consumer thread when index at which object is to be inserted
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index acb3b3b..972e414 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMergerUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
@@ -174,7 +175,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
blockSizeThreshold);
- this.executorService = Executors.newFixedThreadPool(1);
+ this.executorService = Executors.newFixedThreadPool(1,
+ new CarbonThreadFactory("LocalToHDFSCopyPool:" + dataWriterVo.getTableName()));
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = dataWriterVo.getColCardinality();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 70a8703..d8ae8ff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -99,6 +99,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
+ LOGGER.error(e, "Problem while writing the carbon file");
throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
}
}
@@ -184,7 +185,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
}
pageId = 0;
} catch (IOException e) {
- throw new CarbonDataWriterException("Problem when writing file", e);
+ LOGGER.error(e, "Problem while writing file");
+ throw new CarbonDataWriterException("Problem while writing file", e);
}
// clear the data holder
blockletDataHolder.clear();
@@ -213,6 +215,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
measureStartIndex++;
}
} catch (IOException e) {
+ LOGGER.error(e, "Problem while getting the data chunks");
throw new CarbonDataWriterException("Problem while getting the data chunks", e);
}
return size;
@@ -346,6 +349,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
try {
writeIndexFile();
} catch (IOException e) {
+ LOGGER.error(e, "Problem while writing the index file");
throw new CarbonDataWriterException("Problem while writing the index file", e);
}
closeExecutorService();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a734add5/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 8681269..0b88684 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -67,6 +67,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -216,7 +217,8 @@ public final class CarbonLoaderUtil {
throw new RuntimeException("Store location not set for the key " + tempLocationKey);
}
// submit local folder clean up in another thread so that main thread execution is not blocked
- ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
+ ExecutorService localFolderDeletionService = Executors
+ .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
try {
localFolderDeletionService.submit(new Callable<Void>() {
@Override public Void call() throws Exception {