You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/05 01:20:36 UTC
carbondata git commit: [HOTFIX] Throw original exception in thread
pool
Repository: carbondata
Updated Branches:
refs/heads/master 934216de1 -> 469c52f5d
[HOTFIX] Throw original exception in thread pool
If there are exception occurs in the Callable.run in the thread pool, it should throw the original exception instead of throw a new one, which makes it hard for debugging.
This closes #2887
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/469c52f5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/469c52f5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/469c52f5
Branch: refs/heads/master
Commit: 469c52f5d4c18579e2a6ed4c3bb35691cf01937b
Parents: 934216d
Author: Jacky Li <ja...@qq.com>
Authored: Wed Oct 31 20:16:11 2018 +0800
Committer: xuchuanyin <xu...@hust.edu.cn>
Committed: Mon Nov 5 09:15:53 2018 +0800
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +
.../carbondata/spark/rdd/PartitionDropper.scala | 7 +-
.../spark/rdd/PartitionSplitter.scala | 4 +-
.../steps/DataWriterProcessorStepImpl.java | 68 ++++++--------------
4 files changed, 27 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 041dc1c..0b6a2a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -156,6 +156,8 @@ class NewCarbonDataLoadRDD[K, V](
logInfo("Bad Record Found")
case e: Exception =>
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
+ executionErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+ executionErrors.errorMsg = e.getMessage
logInfo("DataLoad failure", e)
LOGGER.error(e)
throw e
http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index 6911b0b..353a478 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -102,8 +102,8 @@ object PartitionDropper {
Seq(partitionId, targetPartitionId).toList, dbName,
tableName, partitionInfo)
} catch {
- case e: IOException => sys.error(s"Exception while delete original carbon files " +
- e.getMessage)
+ case e: IOException =>
+ throw new IOException("Exception while delete original carbon files ", e)
}
Audit.log(logger, s"Drop Partition request completed for table " +
s"${ dbName }.${ tableName }")
@@ -111,7 +111,8 @@ object PartitionDropper {
s"${ dbName }.${ tableName }")
}
} catch {
- case e: Exception => sys.error(s"Exception in dropping partition action: ${ e.getMessage }")
+ case e: Exception =>
+ throw new RuntimeException("Exception in dropping partition action", e)
}
} else {
PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index ca9f049..369ad51 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -87,8 +87,8 @@ object PartitionSplitter {
deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier,
Seq(partitionId).toList, databaseName, tableName, partitionInfo)
} catch {
- case e: IOException => sys.error(s"Exception while delete original carbon files " +
- e.getMessage)
+ case e: IOException =>
+ throw new IOException("Exception while delete original carbon files ", e)
}
Audit.log(logger, s"Add/Split Partition request completed for table " +
s"${ databaseName }.${ tableName }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 1657476..2dc3275 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -122,7 +121,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
// do this concurrently
for (Iterator<CarbonRowBatch> iterator : iterators) {
rangeExecutorServiceSubmitList.add(
- rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i)));
+ rangeExecutorService.submit(new WriterForwarder(iterator, i)));
i++;
}
try {
@@ -131,15 +130,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) {
rangeExecutorServiceSubmitList.get(j).get();
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException e) {
throw new CarbonDataWriterException(e);
+ } catch (ExecutionException e) {
+ throw new CarbonDataWriterException(e.getCause());
}
} catch (CarbonDataWriterException e) {
- LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
- throw new CarbonDataLoadingException(
- "Error while initializing data handler : " + e.getMessage());
+ throw new CarbonDataLoadingException("Error while initializing writer: " + e.getMessage(), e);
} catch (Exception e) {
- LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
}
return null;
@@ -154,19 +152,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
*/
private final class WriterForwarder implements Callable<Void> {
private Iterator<CarbonRowBatch> insideRangeIterator;
- private CarbonTableIdentifier tableIdentifier;
private int rangeId;
- public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator,
- CarbonTableIdentifier tableIdentifier, int rangeId) {
+ WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) {
this.insideRangeIterator = insideRangeIterator;
- this.tableIdentifier = tableIdentifier;
this.rangeId = rangeId;
}
- @Override public Void call() throws Exception {
- LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
- + ", range: " + rangeId);
+ @Override public Void call() {
processRange(insideRangeIterator, rangeId);
return null;
}
@@ -184,8 +177,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
while (insideRangeIterator.hasNext()) {
if (rowsNotExist) {
rowsNotExist = false;
- dataHandler = CarbonFactHandlerFactory
- .createCarbonFactHandler(model);
+ dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
carbonFactHandlers.add(dataHandler);
dataHandler.initialise();
}
@@ -201,12 +193,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
String tableName = tableIdentifier.getTableName();
-
- try {
- dataHandler.finish();
- } catch (Exception e) {
- LOGGER.error("Failed for table: " + tableName + " in finishing data handler", e);
- }
+ dataHandler.finish();
LOGGER.info("Record Processed For table: " + tableName);
String logMessage =
"Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
@@ -222,41 +209,24 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
System.currentTimeMillis());
}
- private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
+ private void processingComplete(CarbonFactHandler dataHandler) {
if (null != dataHandler) {
- try {
- dataHandler.closeHandler();
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e.getMessage(), e);
- throw new CarbonDataLoadingException(e.getMessage(), e);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
- }
+ dataHandler.closeHandler();
}
}
- private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
- throws CarbonDataLoadingException {
- try {
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- dataHandler.addDataToStore(row);
- readCounter++;
- }
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
+ private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) {
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ dataHandler.addDataToStore(row);
+ readCounter++;
}
rowCounter.getAndAdd(batch.getSize());
}
- public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
- try {
- readCounter++;
- dataHandler.addDataToStore(row);
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
+ public void processRow(CarbonRow row, CarbonFactHandler dataHandler) {
+ readCounter++;
+ dataHandler.addDataToStore(row);
rowCounter.getAndAdd(1);
}